Merge origin/pro6000_xh: differential drive, record/replay tools, scene assets

Conflicts resolved:
- assets.py: keep /home/tangger/LYT path (ours)
- mindrobot_cfg.py: keep our initial poses + fix path separator \\ → /
- mindrobot_left_arm_ik_env_cfg.py: keep our wheel_action using
  MINDBOT_WHEEL_JOINTS constant (cleaner than remote's hardcoded list)

From origin/pro6000_xh:
- scripts/tools/: record_demos.py, replay_demos.py, hdf5 tools (Isaac Lab copy)
- assets/: dryingbox.py, lab.py, table.py scene asset definitions
- tele_se3_with_wheel_agent.py: WheelKeyboard inlined, differential drive keys

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-14 13:44:02 +08:00
25 changed files with 3709 additions and 87 deletions

View File

@@ -0,0 +1,143 @@
# scripts/environments/teleoperation/mindrobot_keyboard.py
# Copyright (c) 2022-2026, The Isaac Lab Project Developers.
# SPDX-License-Identifier: BSD-3-Clause
"""Shared keyboard controllers for MindRobot teleoperation and demo recording.
This module contains ONLY class definitions (no argparse, no AppLauncher),
so it is safe to import from both standalone scripts and from record_demos.py.
"""
import weakref
import numpy as np
import torch
import carb
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg
class WheelKeyboard:
"""Differential-drive (skid-steer) keyboard controller.
Listens to arrow keys via Carb and produces a 4-D joint-velocity
command: [right_b, left_b, left_f, right_f] (rad/s).
Key mappings
─────────────────────────────────────────────────────
↑ (UP) forward [ v, v, v, v]
↓ (DOWN) backward [-v, -v, -v, -v]
← (LEFT) left turn [ v, -v, -v, v]
→ (RIGHT) right turn[-v, v, v, -v]
─────────────────────────────────────────────────────
"""
_WHEEL_KEYS = {"UP", "DOWN", "LEFT", "RIGHT"}
def __init__(self, wheel_speed: float = 5.0, sim_device: str = "cuda:0"):
self.wheel_speed = wheel_speed
self._sim_device = sim_device
self._wheel_vel = np.zeros(4)
self._key_map: dict[str, np.ndarray] = {}
self._create_bindings()
import omni
self._appwindow = omni.appwindow.get_default_app_window()
self._input = carb.input.acquire_input_interface()
self._keyboard = self._appwindow.get_keyboard()
self._keyboard_sub = self._input.subscribe_to_keyboard_events(
self._keyboard,
lambda event, *args, obj=weakref.proxy(self): obj._on_keyboard_event(event, *args),
)
def __del__(self):
self._input.unsubscribe_to_keyboard_events(self._keyboard, self._keyboard_sub)
def __str__(self) -> str:
return (
"WheelKeyboard (skid-steer chassis controller)\n"
"\t↑ UP — forward\n"
"\t↓ DOWN — backward\n"
"\t← LEFT — left turn (in-place)\n"
"\t→ RIGHT — right turn (in-place)"
)
def reset(self):
self._wheel_vel = np.zeros(4)
def advance(self) -> torch.Tensor:
return torch.tensor(self._wheel_vel.copy(), dtype=torch.float32, device=self._sim_device)
def _create_bindings(self):
v = self.wheel_speed
self._key_map = {
"UP": np.array([ v, v, v, v]),
"DOWN": np.array([-v, -v, -v, -v]),
"LEFT": np.array([ v, -v, -v, v]),
"RIGHT": np.array([-v, v, v, -v]),
}
def _on_keyboard_event(self, event, *args, **kwargs):
if event.type == carb.input.KeyboardEventType.KEY_PRESS:
key = event.input.name
if key in self._key_map:
self._wheel_vel += self._key_map[key]
if event.type == carb.input.KeyboardEventType.KEY_RELEASE:
key = event.input.name
if key in self._key_map:
self._wheel_vel -= self._key_map[key]
return True
class MindRobotCombinedKeyboard:
"""组合遥操作控制器:左臂 IK (Se3Keyboard 7D) + 底盘轮速 (WheelKeyboard 4D)
输出 11 维 action: [arm_IK(6) | wheel_vel(4) | gripper(1)]
与 MindRobotTeleopActionsCfg 中声明顺序一致。
Key bindings
─────────────────────────────────────────────
W/S/A/D/Q/E — arm IK translate X/Y/Z
Z/X T/G C/V — arm IK rotate roll/pitch/yaw
K — gripper toggle open/close
↑↓←→ — chassis forward/backward/turn
R — reset
─────────────────────────────────────────────
"""
def __init__(
self,
pos_sensitivity: float = 0.05,
rot_sensitivity: float = 0.05,
wheel_speed: float = 5.0,
sim_device: str = "cuda:0",
):
self._arm = Se3Keyboard(
Se3KeyboardCfg(
pos_sensitivity=pos_sensitivity,
rot_sensitivity=rot_sensitivity,
sim_device=sim_device,
)
)
self._wheel = WheelKeyboard(wheel_speed=wheel_speed, sim_device=sim_device)
def reset(self):
self._arm.reset()
self._wheel.reset()
def add_callback(self, key: str, callback):
"""Delegate callbacks to the arm keyboard (Se3Keyboard supports arbitrary key callbacks)."""
self._arm.add_callback(key, callback)
def advance(self) -> torch.Tensor:
arm_cmd = self._arm.advance() # (7,): [dx,dy,dz,rx,ry,rz, gripper]
arm_ik = arm_cmd[:6] # (6,)
gripper = arm_cmd[6:7] # (1,)
wheel_vel = self._wheel.advance() # (4,)
return torch.cat([arm_ik, wheel_vel, gripper]) # (11,)
def __str__(self) -> str:
return (
"MindRobotCombinedKeyboard\n"
" Arm (Se3Keyboard): W/S/A/D/Q/E + Z/X/T/G/C/V, K=gripper\n"
" Wheel (arrows): ↑↓←→\n"
" R = reset | L = save-success-and-reset"
)

View File

@@ -13,8 +13,6 @@ Extends teleop_se3_agent.py with differential-drive keyboard control:
"""
import argparse
import weakref
import numpy as np
import torch
from isaaclab.app import AppLauncher
@@ -35,98 +33,20 @@ simulation_app = app_launcher.app
import logging
import gymnasium as gym
import carb
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg
from isaaclab.envs import ManagerBasedRLEnvCfg
from isaaclab.managers import TerminationTermCfg as DoneTerm
import mindbot.tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
# mindrobot_keyboard.py lives in the same directory as this script.
# Python adds the script's own directory to sys.path, so a direct import works.
from mindrobot_keyboard import WheelKeyboard, MindRobotCombinedKeyboard # noqa: E402
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────
# WheelKeyboard: 监听方向键,输出 4 维轮速 tensor
# ─────────────────────────────────────────────────────────────
class WheelKeyboard:
"""Differential-drive (skid-steer) keyboard controller.
Listens to arrow keys via Carb and produces a 4-D joint-velocity
command: [right_b, left_b, left_f, right_f] (rad/s).
Key mappings
─────────────────────────────────────────────────────
↑ (UP) forward [ v, v, v, v]
↓ (DOWN) backward [-v, -v, -v, -v]
← (LEFT) left turn [ v, -v, -v, v] right wheels fwd
→ (RIGHT) right turn[-v, v, v, -v] left wheels fwd
─────────────────────────────────────────────────────
"""
_WHEEL_KEYS = {"UP", "DOWN", "LEFT", "RIGHT"}
def __init__(self, wheel_speed: float = 5.0, sim_device: str = "cuda:0"):
self.wheel_speed = wheel_speed
self._sim_device = sim_device
# 4 维速度缓冲:[right_b, left_b, left_f, right_f]
self._wheel_vel = np.zeros(4)
# 按键 → 速度向量映射 (在 _create_bindings 中填充)
self._key_map: dict[str, np.ndarray] = {}
self._create_bindings()
# 订阅 Carb 键盘事件
import omni
self._appwindow = omni.appwindow.get_default_app_window()
self._input = carb.input.acquire_input_interface()
self._keyboard = self._appwindow.get_keyboard()
self._keyboard_sub = self._input.subscribe_to_keyboard_events(
self._keyboard,
lambda event, *args, obj=weakref.proxy(self): obj._on_keyboard_event(event, *args),
)
def __del__(self):
self._input.unsubscribe_to_keyboard_events(self._keyboard, self._keyboard_sub)
def __str__(self) -> str:
return (
"WheelKeyboard (skid-steer chassis controller)\n"
"\t↑ UP — forward\n"
"\t↓ DOWN — backward\n"
"\t← LEFT — left turn (in-place)\n"
"\t→ RIGHT — right turn (in-place)"
)
def reset(self):
self._wheel_vel = np.zeros(4)
def advance(self) -> torch.Tensor:
"""Returns 4-D wheel velocity tensor [right_b, left_b, left_f, right_f]."""
return torch.tensor(self._wheel_vel.copy(), dtype=torch.float32, device=self._sim_device)
def _create_bindings(self):
v = self.wheel_speed
# [right_b, left_b, left_f, right_f]
self._key_map = {
"UP": np.array([ v, v, v, v]), # 前进
"DOWN": np.array([-v, -v, -v, -v]), # 后退
"LEFT": np.array([ v, -v, -v, v]), # 左转
"RIGHT": np.array([-v, v, v, -v]), # 右转
}
def _on_keyboard_event(self, event, *args, **kwargs):
if event.type == carb.input.KeyboardEventType.KEY_PRESS:
key = event.input.name
if key in self._key_map:
self._wheel_vel += self._key_map[key]
if event.type == carb.input.KeyboardEventType.KEY_RELEASE:
key = event.input.name
if key in self._key_map:
self._wheel_vel -= self._key_map[key]
return True
# ─────────────────────────────────────────────────────────────
# main
# ─────────────────────────────────────────────────────────────
@@ -201,4 +121,4 @@ def main() -> None:
if __name__ == "__main__":
main()
simulation_app.close()
simulation_app.close()

View File

@@ -0,0 +1,100 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Convert a mesh file to `.obj` using blender.
This file processes a given dae mesh file and saves the resulting mesh file in obj format.
It needs to be called using the python packaged with blender, i.e.:
blender --background --python blender_obj.py -- -in_file FILE -out_file FILE
For more information: https://docs.blender.org/api/current/index.html
The script was tested on Blender 3.2 on Ubuntu 20.04LTS.
"""
import os
import sys
import bpy
def parse_cli_args():
"""Parse the input command line arguments."""
import argparse
# get the args passed to blender after "--", all of which are ignored by
# blender so scripts may receive their own arguments
argv = sys.argv
if "--" not in argv:
argv = [] # as if no args are passed
else:
argv = argv[argv.index("--") + 1 :] # get all args after "--"
# When --help or no args are given, print this help
usage_text = (
f"Run blender in background mode with this script:\n\tblender --background --python {__file__} -- [options]"
)
parser = argparse.ArgumentParser(description=usage_text)
# Add arguments
parser.add_argument("-i", "--in_file", metavar="FILE", type=str, required=True, help="Path to input OBJ file.")
parser.add_argument("-o", "--out_file", metavar="FILE", type=str, required=True, help="Path to output OBJ file.")
args = parser.parse_args(argv)
# Check if any arguments provided
if not argv or not args.in_file or not args.out_file:
parser.print_help()
return None
# return arguments
return args
def convert_to_obj(in_file: str, out_file: str, save_usd: bool = False):
"""Convert a mesh file to `.obj` using blender.
Args:
in_file: Input mesh file to process.
out_file: Path to store output obj file.
"""
# check valid input file
if not os.path.exists(in_file):
raise FileNotFoundError(in_file)
# add ending of file format
if not out_file.endswith(".obj"):
out_file += ".obj"
# create directory if it doesn't exist for destination file
if not os.path.exists(os.path.dirname(out_file)):
os.makedirs(os.path.dirname(out_file), exist_ok=True)
# reset scene to empty
bpy.ops.wm.read_factory_settings(use_empty=True)
# load object into scene
if in_file.endswith(".dae"):
bpy.ops.wm.collada_import(filepath=in_file)
elif in_file.endswith(".stl") or in_file.endswith(".STL"):
bpy.ops.import_mesh.stl(filepath=in_file)
else:
raise ValueError(f"Input file not in dae/stl format: {in_file}")
# convert to obj format and store with z up
# TODO: Read the convention from dae file instead of manually fixing it.
# Reference: https://docs.blender.org/api/2.79/bpy.ops.export_scene.html
bpy.ops.export_scene.obj(
filepath=out_file, check_existing=False, axis_forward="Y", axis_up="Z", global_scale=1, path_mode="RELATIVE"
)
# save it as usd as well
if save_usd:
out_file = out_file.replace("obj", "usd")
bpy.ops.wm.usd_export(filepath=out_file, check_existing=False)
if __name__ == "__main__":
# read arguments
cli_args = parse_cli_args()
# check CLI args
if cli_args is None:
sys.exit()
# process via blender
convert_to_obj(cli_args.in_file, cli_args.out_file)

View File

@@ -0,0 +1,134 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script uses the cloner API to check if asset has been instanced properly.
Usage with different inputs (replace `<Asset-Path>` and `<Asset-Path-Instanced>` with the path to the
original asset and the instanced asset respectively):
```bash
./isaaclab.sh -p source/tools/check_instanceable.py <Asset-Path> -n 4096 --headless --physics
./isaaclab.sh -p source/tools/check_instanceable.py <Asset-Path-Instanced> -n 4096 --headless --physics
./isaaclab.sh -p source/tools/check_instanceable.py <Asset-Path> -n 4096 --headless
./isaaclab.sh -p source/tools/check_instanceable.py <Asset-Path-Instanced> -n 4096 --headless
```
Output from the above commands:
```bash
>>> Cloning time (cloner.clone): 0.648198 seconds
>>> Setup time (sim.reset): : 5.843589 seconds
[#clones: 4096, physics: True] Asset: <Asset-Path-Instanced> : 6.491870 seconds
>>> Cloning time (cloner.clone): 0.693133 seconds
>>> Setup time (sim.reset): 50.860526 seconds
[#clones: 4096, physics: True] Asset: <Asset-Path> : 51.553743 seconds
>>> Cloning time (cloner.clone) : 0.687201 seconds
>>> Setup time (sim.reset) : 6.302215 seconds
[#clones: 4096, physics: False] Asset: <Asset-Path-Instanced> : 6.989500 seconds
>>> Cloning time (cloner.clone) : 0.678150 seconds
>>> Setup time (sim.reset) : 52.854054 seconds
[#clones: 4096, physics: False] Asset: <Asset-Path> : 53.532287 seconds
```
"""
"""Launch Isaac Sim Simulator first."""
import argparse
import contextlib
import os
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser("Utility to empirically check if asset in instanced properly.")
parser.add_argument("input", type=str, help="The path to the USD file.")
parser.add_argument("-n", "--num_clones", type=int, default=128, help="Number of clones to spawn.")
parser.add_argument("-s", "--spacing", type=float, default=1.5, help="Spacing between instances in a grid.")
parser.add_argument("-p", "--physics", action="store_true", default=False, help="Clone assets using physics cloner.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
from isaacsim.core.cloner import GridCloner
import isaaclab.sim as sim_utils
from isaaclab.sim import SimulationCfg, SimulationContext
from isaaclab.utils import Timer
from isaaclab.utils.assets import check_file_path
def main():
"""Spawns the USD asset robot and clones it using Isaac Gym Cloner API."""
# check valid file path
if not check_file_path(args_cli.input):
raise ValueError(f"Invalid file path: {args_cli.input}")
# Load kit helper
sim = SimulationContext(SimulationCfg(dt=0.01))
# get stage handle
stage = sim_utils.get_current_stage()
# enable fabric which avoids passing data over to USD structure
# this speeds up the read-write operation of GPU buffers
if sim.get_physics_context().use_gpu_pipeline:
sim.get_physics_context().enable_fabric(True)
# increase GPU buffer dimensions
sim.get_physics_context().set_gpu_found_lost_aggregate_pairs_capacity(2**25)
sim.get_physics_context().set_gpu_total_aggregate_pairs_capacity(2**21)
# enable hydra scene-graph instancing
# this is needed to visualize the scene when fabric is enabled
sim._settings.set_bool("/persistent/omnihydra/useSceneGraphInstancing", True)
# Create interface to clone the scene
cloner = GridCloner(spacing=args_cli.spacing, stage=stage)
cloner.define_base_env("/World/envs")
stage.DefinePrim("/World/envs/env_0", "Xform")
# Spawn things into stage
sim_utils.create_prim("/World/Light", "DistantLight")
# Everything under the namespace "/World/envs/env_0" will be cloned
sim_utils.create_prim("/World/envs/env_0/Asset", "Xform", usd_path=os.path.abspath(args_cli.input))
# Clone the scene
num_clones = args_cli.num_clones
# Create a timer to measure the cloning time
with Timer(f"[#clones: {num_clones}, physics: {args_cli.physics}] Asset: {args_cli.input}"):
# Clone the scene
with Timer(">>> Cloning time (cloner.clone)"):
cloner.define_base_env("/World/envs")
envs_prim_paths = cloner.generate_paths("/World/envs/env", num_paths=num_clones)
_ = cloner.clone(
source_prim_path="/World/envs/env_0", prim_paths=envs_prim_paths, replicate_physics=args_cli.physics
)
# Play the simulator
with Timer(">>> Setup time (sim.reset)"):
sim.reset()
# Simulate scene (if not headless)
if not args_cli.headless:
with contextlib.suppress(KeyboardInterrupt):
while sim.is_playing():
# perform step
sim.step()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,160 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Utility to bulk convert URDFs or mesh files into instanceable USD format.
Unified Robot Description Format (URDF) is an XML file format used in ROS to describe all elements of
a robot. For more information, see: http://wiki.ros.org/urdf
This script uses the URDF importer extension from Isaac Sim (``omni.isaac.urdf_importer``) to convert a
URDF asset into USD format. It is designed as a convenience script for command-line use. For more
information on the URDF importer, see the documentation for the extension:
https://docs.omniverse.nvidia.com/app_isaacsim/app_isaacsim/ext_omni_isaac_urdf.html
positional arguments:
input The path to the input directory containing URDFs and Meshes.
output The path to directory to store the instanceable files.
optional arguments:
-h, --help Show this help message and exit
--conversion-type Select file type to convert, urdf or mesh. (default: urdf)
--merge-joints Consolidate links that are connected by fixed joints. (default: False)
--fix-base Fix the base to where it is imported. (default: False)
--make-instanceable Make the asset instanceable for efficient cloning. (default: False)
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Utility to convert a URDF or mesh into an Instanceable asset.")
parser.add_argument("input", type=str, help="The path to the input directory.")
parser.add_argument("output", type=str, help="The path to directory to store converted instanceable files.")
parser.add_argument(
"--conversion-type", type=str, default="both", help="Select file type to convert, urdf, mesh, or both."
)
parser.add_argument(
"--merge-joints",
action="store_true",
default=False,
help="Consolidate links that are connected by fixed joints.",
)
parser.add_argument("--fix-base", action="store_true", default=False, help="Fix the base to where it is imported.")
parser.add_argument(
"--make-instanceable",
action="store_true",
default=True,
help="Make the asset instanceable for efficient cloning.",
)
parser.add_argument(
"--collision-approximation",
type=str,
default="convexDecomposition",
choices=["convexDecomposition", "convexHull", "none"],
help=(
'The method used for approximating collision mesh. Set to "none" '
"to not add a collision mesh to the converted mesh."
),
)
parser.add_argument(
"--mass",
type=float,
default=None,
help="The mass (in kg) to assign to the converted asset. If not provided, then no mass is added.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import os
from isaaclab.sim.converters import MeshConverter, MeshConverterCfg, UrdfConverter, UrdfConverterCfg
from isaaclab.sim.schemas import schemas_cfg
def main():
# Define conversion time given
conversion_type = args_cli.conversion_type.lower()
# Warning if conversion type input is not valid
if conversion_type != "urdf" and conversion_type != "mesh" and conversion_type != "both":
raise Warning("Conversion type is not valid, please select either 'urdf', 'mesh', or 'both'.")
if not os.path.exists(args_cli.input):
print(f"Error: The directory {args_cli.input} does not exist.")
# For each file and subsequent sub-directory
for root, dirs, files in os.walk(args_cli.input):
# For each file
for filename in files:
# Check for URDF extensions
if (conversion_type == "urdf" or conversion_type == "both") and filename.lower().endswith(".urdf"):
# URDF converter call
urdf_converter_cfg = UrdfConverterCfg(
asset_path=f"{root}/{filename}",
usd_dir=f"{args_cli.output}/{filename[:-5]}",
usd_file_name=f"{filename[:-5]}.usd",
fix_base=args_cli.fix_base,
merge_fixed_joints=args_cli.merge_joints,
force_usd_conversion=True,
make_instanceable=args_cli.make_instanceable,
)
# Create Urdf converter and import the file
urdf_converter = UrdfConverter(urdf_converter_cfg)
print(f"Generated USD file: {urdf_converter.usd_path}")
elif (conversion_type == "mesh" or conversion_type == "both") and (
filename.lower().endswith(".fbx")
or filename.lower().endswith(".obj")
or filename.lower().endswith(".dae")
or filename.lower().endswith(".stl")
):
# Mass properties
if args_cli.mass is not None:
mass_props = schemas_cfg.MassPropertiesCfg(mass=args_cli.mass)
rigid_props = schemas_cfg.RigidBodyPropertiesCfg()
else:
mass_props = None
rigid_props = None
# Collision properties
collision_props = schemas_cfg.CollisionPropertiesCfg(
collision_enabled=args_cli.collision_approximation != "none"
)
# Mesh converter call
mesh_converter_cfg = MeshConverterCfg(
mass_props=mass_props,
rigid_props=rigid_props,
collision_props=collision_props,
asset_path=f"{root}/{filename}",
force_usd_conversion=True,
usd_dir=f"{args_cli.output}/{filename[:-4]}",
usd_file_name=f"{filename[:-4]}.usd",
make_instanceable=args_cli.make_instanceable,
collision_approximation=args_cli.collision_approximation,
)
# Create mesh converter and import the file
mesh_converter = MeshConverter(mesh_converter_cfg)
print(f"Generated USD file: {mesh_converter.usd_path}")
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,205 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Utility to convert a OBJ/STL/FBX into USD format.
The OBJ file format is a simple data-format that represents 3D geometry alone — namely, the position
of each vertex, the UV position of each texture coordinate vertex, vertex normals, and the faces that
make each polygon defined as a list of vertices, and texture vertices.
An STL file describes a raw, unstructured triangulated surface by the unit normal and vertices (ordered
by the right-hand rule) of the triangles using a three-dimensional Cartesian coordinate system.
FBX files are a type of 3D model file created using the Autodesk FBX software. They can be designed and
modified in various modeling applications, such as Maya, 3ds Max, and Blender. Moreover, FBX files typically
contain mesh, material, texture, and skeletal animation data.
Link: https://www.autodesk.com/products/fbx/overview
This script uses the asset converter extension from Isaac Sim (``omni.kit.asset_converter``) to convert a
OBJ/STL/FBX asset into USD format. It is designed as a convenience script for command-line use.
positional arguments:
input The path to the input mesh (.OBJ/.STL/.FBX) file.
output The path to store the USD file.
optional arguments:
-h, --help Show this help message and exit
--make-instanceable, Make the asset instanceable for efficient cloning. (default: False)
--collision-approximation The method used for approximating collision mesh. Defaults to convexDecomposition.
Set to \"none\" to not add a collision mesh to the converted mesh.
(default: convexDecomposition)
--mass The mass (in kg) to assign to the converted asset. (default: None)
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# Define collision approximation choices (must be defined before parser)
_valid_collision_approx = [
"convexDecomposition",
"convexHull",
"triangleMesh",
"meshSimplification",
"sdf",
"boundingCube",
"boundingSphere",
"none",
]
# add argparse arguments
parser = argparse.ArgumentParser(description="Utility to convert a mesh file into USD format.")
parser.add_argument("input", type=str, help="The path to the input mesh file.")
parser.add_argument("output", type=str, help="The path to store the USD file.")
parser.add_argument(
"--make-instanceable",
action="store_true",
default=False,
help="Make the asset instanceable for efficient cloning.",
)
parser.add_argument(
"--collision-approximation",
type=str,
default="convexDecomposition",
choices=_valid_collision_approx,
help="The method used for approximating the collision mesh. Set to 'none' to disable collision mesh generation.",
)
parser.add_argument(
"--mass",
type=float,
default=None,
help="The mass (in kg) to assign to the converted asset. If not provided, then no mass is added.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import contextlib
import os
import carb
import omni.kit.app
import isaaclab.sim as sim_utils
from isaaclab.sim.converters import MeshConverter, MeshConverterCfg
from isaaclab.sim.schemas import schemas_cfg
from isaaclab.utils.assets import check_file_path
from isaaclab.utils.dict import print_dict
collision_approximation_map = {
"convexDecomposition": schemas_cfg.ConvexDecompositionPropertiesCfg,
"convexHull": schemas_cfg.ConvexHullPropertiesCfg,
"triangleMesh": schemas_cfg.TriangleMeshPropertiesCfg,
"meshSimplification": schemas_cfg.TriangleMeshSimplificationPropertiesCfg,
"sdf": schemas_cfg.SDFMeshPropertiesCfg,
"boundingCube": schemas_cfg.BoundingCubePropertiesCfg,
"boundingSphere": schemas_cfg.BoundingSpherePropertiesCfg,
"none": None,
}
def main():
# check valid file path
mesh_path = args_cli.input
if not os.path.isabs(mesh_path):
mesh_path = os.path.abspath(mesh_path)
if not check_file_path(mesh_path):
raise ValueError(f"Invalid mesh file path: {mesh_path}")
# create destination path
dest_path = args_cli.output
if not os.path.isabs(dest_path):
dest_path = os.path.abspath(dest_path)
# Mass properties
if args_cli.mass is not None:
mass_props = schemas_cfg.MassPropertiesCfg(mass=args_cli.mass)
rigid_props = schemas_cfg.RigidBodyPropertiesCfg()
else:
mass_props = None
rigid_props = None
# Collision properties
collision_props = schemas_cfg.CollisionPropertiesCfg(collision_enabled=args_cli.collision_approximation != "none")
# Create Mesh converter config
cfg_class = collision_approximation_map.get(args_cli.collision_approximation)
if cfg_class is None and args_cli.collision_approximation != "none":
valid_keys = ", ".join(sorted(collision_approximation_map.keys()))
raise ValueError(
f"Invalid collision approximation type '{args_cli.collision_approximation}'. "
f"Valid options are: {valid_keys}."
)
collision_cfg = cfg_class() if cfg_class is not None else None
mesh_converter_cfg = MeshConverterCfg(
mass_props=mass_props,
rigid_props=rigid_props,
collision_props=collision_props,
asset_path=mesh_path,
force_usd_conversion=True,
usd_dir=os.path.dirname(dest_path),
usd_file_name=os.path.basename(dest_path),
make_instanceable=args_cli.make_instanceable,
mesh_collision_props=collision_cfg,
)
# Print info
print("-" * 80)
print("-" * 80)
print(f"Input Mesh file: {mesh_path}")
print("Mesh importer config:")
print_dict(mesh_converter_cfg.to_dict(), nesting=0)
print("-" * 80)
print("-" * 80)
# Create Mesh converter and import the file
mesh_converter = MeshConverter(mesh_converter_cfg)
# print output
print("Mesh importer output:")
print(f"Generated USD file: {mesh_converter.usd_path}")
print("-" * 80)
print("-" * 80)
# Determine if there is a GUI to update:
# acquire settings interface
carb_settings_iface = carb.settings.get_settings()
# read flag for whether a local GUI is enabled
local_gui = carb_settings_iface.get("/app/window/enabled")
# read flag for whether livestreaming GUI is enabled
livestream_gui = carb_settings_iface.get("/app/livestream/enabled")
# Simulate scene (if not headless)
if local_gui or livestream_gui:
# Open the stage with USD
sim_utils.open_stage(mesh_converter.usd_path)
# Reinitialize the simulation
app = omni.kit.app.get_app_interface()
# Run simulation
with contextlib.suppress(KeyboardInterrupt):
while app.is_running():
# perform step
app.update()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,139 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Utility to convert a MJCF into USD format.
MuJoCo XML Format (MJCF) is an XML file format used in MuJoCo to describe all elements of a robot.
For more information, see: http://www.mujoco.org/book/XMLreference.html
This script uses the MJCF importer extension from Isaac Sim (``isaacsim.asset.importer.mjcf``) to convert
a MJCF asset into USD format. It is designed as a convenience script for command-line use. For more information
on the MJCF importer, see the documentation for the extension:
https://docs.isaacsim.omniverse.nvidia.com/latest/robot_setup/ext_isaacsim_asset_importer_mjcf.html
positional arguments:
input The path to the input URDF file.
output The path to store the USD file.
optional arguments:
-h, --help Show this help message and exit
--fix-base Fix the base to where it is imported. (default: False)
--import-sites Import sites by parse <site> tag. (default: True)
--make-instanceable Make the asset instanceable for efficient cloning. (default: False)
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Utility to convert a MJCF into USD format.")
parser.add_argument("input", type=str, help="The path to the input MJCF file.")
parser.add_argument("output", type=str, help="The path to store the USD file.")
parser.add_argument("--fix-base", action="store_true", default=False, help="Fix the base to where it is imported.")
parser.add_argument(
"--import-sites", action="store_true", default=False, help="Import sites by parsing the <site> tag."
)
parser.add_argument(
"--make-instanceable",
action="store_true",
default=False,
help="Make the asset instanceable for efficient cloning.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import contextlib
import os
import carb
import omni.kit.app
import isaaclab.sim as sim_utils
from isaaclab.sim.converters import MjcfConverter, MjcfConverterCfg
from isaaclab.utils.assets import check_file_path
from isaaclab.utils.dict import print_dict
def main():
# check valid file path
mjcf_path = args_cli.input
if not os.path.isabs(mjcf_path):
mjcf_path = os.path.abspath(mjcf_path)
if not check_file_path(mjcf_path):
raise ValueError(f"Invalid file path: {mjcf_path}")
# create destination path
dest_path = args_cli.output
if not os.path.isabs(dest_path):
dest_path = os.path.abspath(dest_path)
# create the converter configuration
mjcf_converter_cfg = MjcfConverterCfg(
asset_path=mjcf_path,
usd_dir=os.path.dirname(dest_path),
usd_file_name=os.path.basename(dest_path),
fix_base=args_cli.fix_base,
import_sites=args_cli.import_sites,
force_usd_conversion=True,
make_instanceable=args_cli.make_instanceable,
)
# Print info
print("-" * 80)
print("-" * 80)
print(f"Input MJCF file: {mjcf_path}")
print("MJCF importer config:")
print_dict(mjcf_converter_cfg.to_dict(), nesting=0)
print("-" * 80)
print("-" * 80)
# Create mjcf converter and import the file
mjcf_converter = MjcfConverter(mjcf_converter_cfg)
# print output
print("MJCF importer output:")
print(f"Generated USD file: {mjcf_converter.usd_path}")
print("-" * 80)
print("-" * 80)
# Determine if there is a GUI to update:
# acquire settings interface
carb_settings_iface = carb.settings.get_settings()
# read flag for whether a local GUI is enabled
local_gui = carb_settings_iface.get("/app/window/enabled")
# read flag for whether livestreaming GUI is enabled
livestream_gui = carb_settings_iface.get("/app/livestream/enabled")
# Simulate scene (if not headless)
if local_gui or livestream_gui:
# Open the stage with USD
sim_utils.open_stage(mjcf_converter.usd_path)
# Reinitialize the simulation
app = omni.kit.app.get_app_interface()
# Run simulation
with contextlib.suppress(KeyboardInterrupt):
while app.is_running():
# perform step
app.update()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,163 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Utility to convert a URDF into USD format.
Unified Robot Description Format (URDF) is an XML file format used in ROS to describe all elements of
a robot. For more information, see: http://wiki.ros.org/urdf
This script uses the URDF importer extension from Isaac Sim (``isaacsim.asset.importer.urdf``) to convert a
URDF asset into USD format. It is designed as a convenience script for command-line use. For more
information on the URDF importer, see the documentation for the extension:
https://docs.isaacsim.omniverse.nvidia.com/latest/robot_setup/ext_isaacsim_asset_importer_urdf.html
positional arguments:
input The path to the input URDF file.
output The path to store the USD file.
optional arguments:
-h, --help Show this help message and exit
--merge-joints Consolidate links that are connected by fixed joints. (default: False)
--fix-base Fix the base to where it is imported. (default: False)
--joint-stiffness The stiffness of the joint drive. (default: 100.0)
--joint-damping The damping of the joint drive. (default: 1.0)
--joint-target-type The type of control to use for the joint drive. (default: "position")
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Utility to convert a URDF into USD format.")
parser.add_argument("input", type=str, help="The path to the input URDF file.")
parser.add_argument("output", type=str, help="The path to store the USD file.")
parser.add_argument(
"--merge-joints",
action="store_true",
default=False,
help="Consolidate links that are connected by fixed joints.",
)
parser.add_argument("--fix-base", action="store_true", default=False, help="Fix the base to where it is imported.")
parser.add_argument(
"--joint-stiffness",
type=float,
default=100.0,
help="The stiffness of the joint drive.",
)
parser.add_argument(
"--joint-damping",
type=float,
default=1.0,
help="The damping of the joint drive.",
)
parser.add_argument(
"--joint-target-type",
type=str,
default="position",
choices=["position", "velocity", "none"],
help="The type of control to use for the joint drive.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import contextlib
import os
import carb
import omni.kit.app
import isaaclab.sim as sim_utils
from isaaclab.sim.converters import UrdfConverter, UrdfConverterCfg
from isaaclab.utils.assets import check_file_path
from isaaclab.utils.dict import print_dict
def main():
# check valid file path
urdf_path = args_cli.input
if not os.path.isabs(urdf_path):
urdf_path = os.path.abspath(urdf_path)
if not check_file_path(urdf_path):
raise ValueError(f"Invalid file path: {urdf_path}")
# create destination path
dest_path = args_cli.output
if not os.path.isabs(dest_path):
dest_path = os.path.abspath(dest_path)
# Create Urdf converter config
urdf_converter_cfg = UrdfConverterCfg(
asset_path=urdf_path,
usd_dir=os.path.dirname(dest_path),
usd_file_name=os.path.basename(dest_path),
fix_base=args_cli.fix_base,
merge_fixed_joints=args_cli.merge_joints,
force_usd_conversion=True,
joint_drive=UrdfConverterCfg.JointDriveCfg(
gains=UrdfConverterCfg.JointDriveCfg.PDGainsCfg(
stiffness=args_cli.joint_stiffness,
damping=args_cli.joint_damping,
),
target_type=args_cli.joint_target_type,
),
)
# Print info
print("-" * 80)
print("-" * 80)
print(f"Input URDF file: {urdf_path}")
print("URDF importer config:")
print_dict(urdf_converter_cfg.to_dict(), nesting=0)
print("-" * 80)
print("-" * 80)
# Create Urdf converter and import the file
urdf_converter = UrdfConverter(urdf_converter_cfg)
# print output
print("URDF importer output:")
print(f"Generated USD file: {urdf_converter.usd_path}")
print("-" * 80)
print("-" * 80)
# Determine if there is a GUI to update:
# acquire settings interface
carb_settings_iface = carb.settings.get_settings()
# read flag for whether a local GUI is enabled
local_gui = carb_settings_iface.get("/app/window/enabled")
# read flag for whether livestreaming GUI is enabled
livestream_gui = carb_settings_iface.get("/app/livestream/enabled")
# Simulate scene (if not headless)
if local_gui or livestream_gui:
# Open the stage with USD
sim_utils.open_stage(urdf_converter.usd_path)
# Reinitialize the simulation
app = omni.kit.app.get_app_interface()
# Run simulation
with contextlib.suppress(KeyboardInterrupt):
while app.is_running():
# perform step
app.update()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,85 @@
# Copyright (c) 2024-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to construct prompts to control the Cosmos model's generation.
Required arguments:
--templates_path Path to the file containing templates for the prompts.
Optional arguments:
--num_prompts Number of prompts to generate (default: 1).
--output_path Path to the output file to write generated prompts (default: prompts.txt).
"""
import argparse
import json
import random
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Generate prompts for controlling Cosmos model's generation.")
parser.add_argument(
"--templates_path", type=str, required=True, help="Path to the JSON file containing prompt templates"
)
parser.add_argument("--num_prompts", type=int, default=1, help="Number of prompts to generate (default: 1)")
parser.add_argument(
"--output_path", type=str, default="prompts.txt", help="Path to the output file to write generated prompts"
)
args = parser.parse_args()
return args
def generate_prompt(templates_path: str):
"""Generate a random prompt for controlling the Cosmos model's visual augmentation.
The prompt describes the scene and desired visual variations, which the model
uses to guide the augmentation process while preserving the core robotic actions.
Args:
templates_path (str): Path to the JSON file containing prompt templates.
Returns:
str: Generated prompt string that specifies visual aspects to modify in the video.
"""
try:
with open(templates_path) as f:
templates = json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"Prompt templates file not found: {templates_path}")
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON in prompt templates file: {templates_path}")
prompt_parts = []
for section_name, section_options in templates.items():
if not isinstance(section_options, list):
continue
if len(section_options) == 0:
continue
selected_option = random.choice(section_options)
prompt_parts.append(selected_option)
return " ".join(prompt_parts)
def main():
# Parse command line arguments
args = parse_args()
prompts = [generate_prompt(args.templates_path) for _ in range(args.num_prompts)]
try:
with open(args.output_path, "w") as f:
for prompt in prompts:
f.write(prompt + "\n")
except Exception as e:
print(f"Failed to write to {args.output_path}: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,96 @@
{
"env": [
"A robotic arm is picking up and stacking cubes inside a foggy industrial scrapyard at dawn, surrounded by piles of old robotic parts and twisted metal. The background includes large magnetic cranes, rusted conveyor belts, and flickering yellow floodlights struggling to penetrate the fog.",
"A robotic arm is picking up and stacking cubes inside a luxury penthouse showroom during sunset. The background includes minimalist designer furniture, a panoramic view of a glowing city skyline, and hovering autonomous drones offering refreshments.",
"A robotic arm is picking up and stacking cubes within an ancient temple-themed robotics exhibit in a museum. The background includes stone columns with hieroglyphic-style etchings, interactive display panels, and a few museum visitors observing silently from behind glass barriers.",
"A robotic arm is picking up and stacking cubes inside a futuristic daycare facility for children. The background includes robotic toys, soft padded walls, holographic storybooks floating in mid-air, and tiny humanoid robots assisting toddlers.",
"A robotic arm is picking up and stacking cubes inside a deep underwater laboratory where pressure-resistant glass panels reveal a shimmering ocean outside. The background includes jellyfish drifting outside the windows, robotic submarines gliding by, and walls lined with wet-surface equipment panels.",
"A robotic arm is picking up and stacking cubes inside a post-apocalyptic lab, partially collapsed and exposed to the open sky. The background includes ruined machinery, exposed rebar, and a distant city skyline covered in ash and fog.",
"A robotic arm is picking up and stacking cubes in a biotech greenhouse surrounded by lush plant life. The background includes rows of bio-engineered plants, misting systems, and hovering inspection drones checking crop health.",
"A robotic arm is picking up and stacking cubes inside a dark, volcanic research outpost. The background includes robotic arms encased in heat-resistant suits, seismic monitors, and distant lava fountains occasionally illuminating the space.",
"A robotic arm is picking up and stacking cubes inside an icy arctic base, with frost-covered walls and equipment glinting under bright artificial white lights. The background includes heavy-duty heaters, control consoles wrapped in thermal insulation, and a large window looking out onto a frozen tundra with polar winds swirling snow outside.",
"A robotic arm is picking up and stacking cubes inside a zero-gravity chamber on a rotating space habitat. The background includes floating lab instruments, panoramic windows showing stars and Earth in rotation, and astronauts monitoring data.",
"A robotic arm is picking up and stacking cubes inside a mystical tech-art installation blending robotics with generative art. The background includes sculptural robotics, shifting light patterns on the walls, and visitors interacting with the exhibit using gestures.",
"A robotic arm is picking up and stacking cubes in a Martian colony dome, under a terraformed red sky filtering through thick glass. The background includes pressure-locked entry hatches, Martian rovers parked outside, and domed hydroponic farms stretching into the distance.",
"A robotic arm is picking up and stacking cubes inside a high-security military robotics testing bunker, with matte green steel walls and strict order. The background includes surveillance cameras, camouflage netting over equipment racks, and military personnel observing from a secure glass-walled control room.",
"A robotic arm is picking up and stacking cubes inside a retro-futuristic robotics lab from the 1980s with checkered floors and analog computer panels. The background includes CRT monitors with green code, rotary dials, printed schematics on the walls, and operators in lab coats typing on clunky terminals.",
"A robotic arm is picking up and stacking cubes inside a sunken ancient ruin repurposed for modern robotics experiments. The background includes carved pillars, vines creeping through gaps in stone, and scattered crates of modern equipment sitting on ancient floors.",
"A robotic arm is picking up and stacking cubes on a luxury interstellar yacht cruising through deep space. The background includes elegant furnishings, ambient synth music systems, and holographic butlers attending to other passengers.",
"A robotic arm is picking up and stacking cubes in a rebellious underground cybernetic hacker hideout. The background includes graffiti-covered walls, tangled wires, makeshift workbenches, and anonymous figures hunched over terminals with scrolling code.",
"A robotic arm is picking up and stacking cubes inside a dense jungle outpost where technology is being tested in extreme organic environments. The background includes humid control panels, vines creeping onto the robotics table, and occasional wildlife observed from a distance by researchers in camo gear.",
"A robotic arm is picking up and stacking cubes in a minimalist Zen tech temple. The background includes bonsai trees on floating platforms, robotic monks sweeping floors silently, and smooth stone pathways winding through digital meditation alcoves."
],
"robot": [
"The robot arm is matte dark green with yellow diagonal hazard stripes along the upper arm; the joints are rugged and chipped, and the hydraulics are exposed with faded red tubing.",
"The robot arm is worn orange with black caution tape markings near the wrist; the elbow joint is dented and the pistons have visible scarring from long use.",
"The robot arm is steel gray with smooth curved panels and subtle blue stripes running down the length; the joints are sealed tight and the hydraulics have a glossy black casing.",
"The robot arm is bright yellow with alternating black bands around each segment; the joints show minor wear, and the hydraulics gleam with fresh lubrication.",
"The robot arm is navy blue with white serial numbers stenciled along the arm; the joints are well-maintained and the hydraulic shafts are matte silver with no visible dirt.",
"The robot arm is deep red with a matte finish and faint white grid lines across the panels; the joints are squared off and the hydraulic units look compact and embedded.",
"The robot arm is dirty white with dark gray speckled patches from wear; the joints are squeaky with exposed rivets, and the hydraulics are rusted at the base.",
"The robot arm is olive green with chipped paint and a black triangle warning icon near the shoulder; the joints are bulky and the hydraulics leak slightly around the seals.",
"The robot arm is bright teal with a glossy surface and silver stripes on the outer edges; the joints rotate smoothly and the pistons reflect a pale cyan hue.",
"The robot arm is orange-red with carbon fiber textures and white racing-style stripes down the forearm; the joints have minimal play and the hydraulics are tightly sealed in synthetic tubing.",
"The robot arm is flat black with uneven camouflage blotches in dark gray; the joints are reinforced and the hydraulic tubes are dusty and loose-fitting.",
"The robot arm is dull maroon with vertical black grooves etched into the panels; the joints show corrosion on the bolts and the pistons are thick and slow-moving.",
"The robot arm is powder blue with repeating geometric patterns printed in light gray; the joints are square and the hydraulic systems are internal and silent.",
"The robot arm is brushed silver with high-gloss finish and blue LED strips along the seams; the joints are shiny and tight, and the hydraulics hiss softly with every movement.",
"The robot arm is lime green with paint faded from sun exposure and white warning labels near each joint; the hydraulics are scraped and the fittings show heat marks.",
"The robot arm is dusty gray with chevron-style black stripes pointing toward the claw; the joints have uneven wear, and the pistons are dented and slightly bent.",
"The robot arm is cobalt blue with glossy texture and stylized angular black patterns across each segment; the joints are clean and the hydraulics show new flexible tubing.",
"The robot arm is industrial brown with visible welded seams and red caution tape wrapped loosely around the middle section; the joints are clunky and the hydraulics are slow and loud.",
"The robot arm is flat tan with dark green splotches and faint stencil text across the forearm; the joints have dried mud stains and the pistons are partially covered in grime.",
"The robot arm is light orange with chrome hexagon detailing and black number codes on the side; the joints are smooth and the hydraulic actuators shine under the lab lights."
],
"table": [
"The robot arm is mounted on a table that is dull gray metal with scratches and scuff marks across the surface; faint rust rings are visible where older machinery used to be mounted.",
"The robot arm is mounted on a table that is smooth black plastic with a matte finish and faint fingerprint smudges near the edges; corners are slightly worn from regular use.",
"The robot arm is mounted on a table that is light oak wood with a natural grain pattern and a glossy varnish that reflects overhead lights softly; small burn marks dot one corner.",
"The robot arm is mounted on a table that is rough concrete with uneven texture and visible air bubbles; some grease stains and faded yellow paint markings suggest heavy usage.",
"The robot arm is mounted on a table that is brushed aluminum with a clean silver tone and very fine linear grooves; surface reflects light evenly, giving a soft glow.",
"The robot arm is mounted on a table that is pale green composite with chipped corners and scratches revealing darker material beneath; tape residue is stuck along the edges.",
"The robot arm is mounted on a table that is dark brown with a slightly cracked synthetic coating; patches of discoloration suggest exposure to heat or chemicals over time.",
"The robot arm is mounted on a table that is polished steel with mirror-like reflections; every small movement of the robot is mirrored faintly across the surface.",
"The robot arm is mounted on a table that is white with a slightly textured ceramic top, speckled with tiny black dots; the surface is clean but the edges are chipped.",
"The robot arm is mounted on a table that is glossy black glass with a deep shine and minimal dust; any lights above are clearly reflected, and fingerprints are visible under certain angles.",
"The robot arm is mounted on a table that is matte red plastic with wide surface scuffs and paint transfer from other objects; faint gridlines are etched into one side.",
"The robot arm is mounted on a table that is dark navy laminate with a low-sheen surface and subtle wood grain texture; the edge banding is slightly peeling off.",
"The robot arm is mounted on a table that is yellow-painted steel with diagonal black warning stripes running along one side; the paint is scratched and faded in high-contact areas.",
"The robot arm is mounted on a table that is translucent pale blue polymer with internal striations and slight glow under overhead lights; small bubbles are frozen inside the material.",
"The robot arm is mounted on a table that is cold concrete with embedded metal panels bolted into place; the surface has oil stains, welding marks, and tiny debris scattered around.",
"The robot arm is mounted on a table that is shiny chrome with heavy smudging and streaks; the table reflects distorted shapes of everything around it, including the arm itself.",
"The robot arm is mounted on a table that is matte forest green with shallow dents and drag marks from prior mechanical operations; a small sticker label is half-torn in one corner.",
"The robot arm is mounted on a table that is textured black rubber with slight give under pressure; scratches from the robot's base and clamp marks are clearly visible.",
"The robot arm is mounted on a table that is medium gray ceramic tile with visible grout lines and chips along the edges; some tiles have tiny cracks or stains.",
"The robot arm is mounted on a table that is old dark wood with faded polish and visible circular stains from spilled liquids; a few deep grooves are carved into the surface near the center."
],
"cubes": [
"The arm is connected to the base mounted on the table. The bottom cube is deep blue, the second cube is bright red, and the top cube is vivid green, maintaining their correct order after stacking."
],
"light": [
"The lighting is soft and diffused from large windows, allowing daylight to fill the room, creating gentle shadows that elongate throughout the space, with a natural warmth due to the sunlight streaming in.",
"Bright fluorescent tubes overhead cast a harsh, even light across the scene, creating sharp, well-defined shadows under the arm and cubes, with a sterile, clinical feel due to the cold white light.",
"Warm tungsten lights in the ceiling cast a golden glow over the table, creating long, soft shadows and a cozy, welcoming atmosphere. The light contrasts with cool blue tones from the robot arm.",
"The lighting comes from several intense spotlights mounted above, each casting focused beams of light that create stark, dramatic shadows around the cubes and the robotic arm, producing a high-contrast look.",
"A single adjustable desk lamp with a soft white bulb casts a directional pool of light over the cubes, causing deep, hard shadows and a quiet, intimate feel in the dimly lit room.",
"The space is illuminated with bright daylight filtering in through a skylight above, casting diffused, soft shadows and giving the scene a clean and natural look, with a cool tint from the daylight.",
"Soft, ambient lighting from hidden LEDs embedded in the ceiling creates a halo effect around the robotic arm, while subtle, elongated shadows stretch across the table surface, giving a sleek modern vibe.",
"Neon strip lights line the walls, casting a cool blue and purple glow across the scene. The robot and table are bathed in this colored light, producing sharp-edged shadows with a futuristic feel.",
"Bright artificial lights overhead illuminate the scene in a harsh white, with scattered, uneven shadows across the table and robot arm. There's a slight yellow hue to the light, giving it an industrial ambiance.",
"Soft morning sunlight spills through a large open window, casting long shadows across the floor and the robot arm. The warm, golden light creates a peaceful, natural atmosphere with a slight coolness in the shadows.",
"Dim ambient lighting with occasional flashes of bright blue light from overhead digital screens creates a high-tech, slightly eerie atmosphere. The shadows are soft, stretching in an almost surreal manner.",
"Lighting from tall lamps outside the room filters in through large glass doors, casting angled shadows across the table and robot arm. The ambient light creates a relaxing, slightly diffused atmosphere.",
"Artificial overhead lighting casts a harsh, stark white light with little warmth, producing sharply defined, almost clinical shadows on the robot arm and cubes. The space feels cold and industrial.",
"Soft moonlight from a large window at night creates a cool, ethereal glow on the table and arm. The shadows are long and faint, and the lighting provides a calm and serene atmosphere.",
"Bright overhead LED panels illuminate the scene with clean, white light, casting neutral shadows that give the environment a modern, sleek feel with minimal distortion or softness in the shadows.",
"A floodlight positioned outside casts bright, almost blinding natural light through an open door, creating high-contrast, sharp-edged shadows across the table and robot arm, adding dramatic tension to the scene.",
"Dim lighting from vintage tungsten bulbs hanging from the ceiling gives the room a warm, nostalgic glow, casting elongated, soft shadows that provide a cozy atmosphere around the robotic arm.",
"Bright fluorescent lights directly above produce a harsh, clinical light that creates sharp, defined shadows on the table and robotic arm, enhancing the industrial feel of the scene.",
"Neon pink and purple lights flicker softly from the walls, illuminating the robot arm with an intense glow that produces sharp, angular shadows across the cubes. The atmosphere feels futuristic and edgy.",
"Sunlight pouring in from a large, open window bathes the table and robotic arm in a warm golden light. The shadows are soft, and the scene feels natural and inviting with a slight contrast between light and shadow."
]
}

View File

@@ -0,0 +1,208 @@
# Copyright (c) 2024-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to convert HDF5 demonstration files to MP4 videos.
This script converts camera frames stored in HDF5 demonstration files to MP4 videos.
It supports multiple camera modalities including RGB, segmentation, and normal maps.
The output videos are saved in the specified directory with appropriate naming.
required arguments:
--input_file Path to the input HDF5 file.
--output_dir Directory to save the output MP4 files.
optional arguments:
--input_keys List of input keys to process from the HDF5 file.
(default: ["table_cam", "wrist_cam", "table_cam_segmentation",
"table_cam_normals", "table_cam_shaded_segmentation"])
--video_height Height of the output video in pixels. (default: 704)
--video_width Width of the output video in pixels. (default: 1280)
--framerate Frames per second for the output video. (default: 30)
"""
import argparse
import os
import cv2
import h5py
import numpy as np
# Constants
DEFAULT_VIDEO_HEIGHT = 704
DEFAULT_VIDEO_WIDTH = 1280
DEFAULT_INPUT_KEYS = [
"table_cam",
"wrist_cam",
"table_cam_segmentation",
"table_cam_normals",
"table_cam_shaded_segmentation",
"table_cam_depth",
]
DEFAULT_FRAMERATE = 30
LIGHT_SOURCE = np.array([0.0, 0.0, 1.0])
MIN_DEPTH = 0.0
MAX_DEPTH = 1.5
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Convert HDF5 demonstration files to MP4 videos.")
parser.add_argument(
"--input_file",
type=str,
required=True,
help="Path to the input HDF5 file containing demonstration data.",
)
parser.add_argument(
"--output_dir",
type=str,
required=True,
help="Directory path where the output MP4 files will be saved.",
)
parser.add_argument(
"--input_keys",
type=str,
nargs="+",
default=DEFAULT_INPUT_KEYS,
help="List of input keys to process.",
)
parser.add_argument(
"--video_height",
type=int,
default=DEFAULT_VIDEO_HEIGHT,
help="Height of the output video in pixels.",
)
parser.add_argument(
"--video_width",
type=int,
default=DEFAULT_VIDEO_WIDTH,
help="Width of the output video in pixels.",
)
parser.add_argument(
"--framerate",
type=int,
default=DEFAULT_FRAMERATE,
help="Frames per second for the output video.",
)
args = parser.parse_args()
return args
def write_demo_to_mp4(
hdf5_file,
demo_id,
frames_path,
input_key,
output_dir,
video_height,
video_width,
framerate=DEFAULT_FRAMERATE,
):
"""Convert frames from an HDF5 file to an MP4 video.
Args:
hdf5_file (str): Path to the HDF5 file containing the frames.
demo_id (int): ID of the demonstration to convert.
frames_path (str): Path to the frames data in the HDF5 file.
input_key (str): Name of the input key to convert.
output_dir (str): Directory to save the output MP4 file.
video_height (int): Height of the output video in pixels.
video_width (int): Width of the output video in pixels.
framerate (int, optional): Frames per second for the output video. Defaults to 30.
"""
with h5py.File(hdf5_file, "r") as f:
# Get frames based on input key type
if "shaded_segmentation" in input_key:
temp_key = input_key.replace("shaded_segmentation", "segmentation")
frames = f[f"data/demo_{demo_id}/obs/{temp_key}"]
else:
frames = f[frames_path + "/" + input_key]
# Setup video writer
output_path = os.path.join(output_dir, f"demo_{demo_id}_{input_key}.mp4")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
if "depth" in input_key:
video = cv2.VideoWriter(output_path, fourcc, framerate, (video_width, video_height), isColor=False)
else:
video = cv2.VideoWriter(output_path, fourcc, framerate, (video_width, video_height))
# Process and write frames
for ix, frame in enumerate(frames):
# Convert normal maps to uint8 if needed
if "normals" in input_key:
frame = (frame * 255.0).astype(np.uint8)
# Process shaded segmentation frames
elif "shaded_segmentation" in input_key:
seg = frame[..., :-1]
normals_key = input_key.replace("shaded_segmentation", "normals")
normals = f[f"data/demo_{demo_id}/obs/{normals_key}"][ix]
shade = 0.5 + (normals * LIGHT_SOURCE[None, None, :]).sum(axis=-1) * 0.5
shaded_seg = (shade[..., None] * seg).astype(np.uint8)
frame = np.concatenate((shaded_seg, frame[..., -1:]), axis=-1)
# Convert RGB to BGR
if "depth" not in input_key:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
else:
frame = (frame[..., 0] - MIN_DEPTH) / (MAX_DEPTH - MIN_DEPTH)
frame = np.where(frame < 0.01, 1.0, frame)
frame = 1.0 - frame
frame = (frame * 255.0).astype(np.uint8)
# Resize to video resolution
frame = cv2.resize(frame, (video_width, video_height), interpolation=cv2.INTER_CUBIC)
video.write(frame)
video.release()
def get_num_demos(hdf5_file):
"""Get the number of demonstrations in the HDF5 file.
Args:
hdf5_file (str): Path to the HDF5 file.
Returns:
int: Number of demonstrations found in the file.
"""
with h5py.File(hdf5_file, "r") as f:
return len(f["data"].keys())
def main():
"""Main function to convert all demonstrations to MP4 videos."""
# Parse command line arguments
args = parse_args()
# Create output directory if it doesn't exist
os.makedirs(args.output_dir, exist_ok=True)
# Get number of demonstrations from the file
num_demos = get_num_demos(args.input_file)
print(f"Found {num_demos} demonstrations in {args.input_file}")
# Convert each demonstration
for i in range(num_demos):
frames_path = f"data/demo_{str(i)}/obs"
for input_key in args.input_keys:
write_demo_to_mp4(
args.input_file,
i,
frames_path,
input_key,
args.output_dir,
args.video_height,
args.video_width,
args.framerate,
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,47 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import os
import h5py
parser = argparse.ArgumentParser(description="Merge a set of HDF5 datasets.")
parser.add_argument(
"--input_files",
type=str,
nargs="+",
default=[],
help="A list of paths to HDF5 files to merge.",
)
parser.add_argument("--output_file", type=str, default="merged_dataset.hdf5", help="File path to merged output.")
args_cli = parser.parse_args()
def merge_datasets():
for filepath in args_cli.input_files:
if not os.path.exists(filepath):
raise FileNotFoundError(f"The dataset file {filepath} does not exist.")
with h5py.File(args_cli.output_file, "w") as output:
episode_idx = 0
copy_attributes = True
for filepath in args_cli.input_files:
with h5py.File(filepath, "r") as input:
for episode, data in input["data"].items():
input.copy(f"data/{episode}", output, f"data/demo_{episode_idx}")
episode_idx += 1
if copy_attributes:
output["data"].attrs["env_args"] = input["data"].attrs["env_args"]
copy_attributes = False
print(f"Merged dataset saved to {args_cli.output_file}")
if __name__ == "__main__":
merge_datasets()

View File

@@ -0,0 +1,169 @@
# Copyright (c) 2024-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to create a new dataset by combining existing HDF5 demonstrations with visually augmented MP4 videos.
This script takes an existing HDF5 dataset containing demonstrations and a directory of MP4 videos
that are visually augmented versions of the original demonstration videos (e.g., with different lighting,
color schemes, or visual effects). It creates a new HDF5 dataset that preserves all the original
demonstration data (actions, robot state, etc.) but replaces the video frames with the augmented versions.
required arguments:
--input_file Path to the input HDF5 file containing original demonstrations.
--output_file Path to save the new HDF5 file with augmented videos.
--videos_dir Directory containing the visually augmented MP4 videos.
"""
import argparse
import glob
import os
import cv2
import h5py
import numpy as np
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Create a new dataset with visually augmented videos.")
parser.add_argument(
"--input_file",
type=str,
required=True,
help="Path to the input HDF5 file containing original demonstrations.",
)
parser.add_argument(
"--videos_dir",
type=str,
required=True,
help="Directory containing the visually augmented MP4 videos.",
)
parser.add_argument(
"--output_file",
type=str,
required=True,
help="Path to save the new HDF5 file with augmented videos.",
)
args = parser.parse_args()
return args
def get_frames_from_mp4(video_path, target_height=None, target_width=None):
"""Extract frames from an MP4 video file.
Args:
video_path (str): Path to the MP4 video file.
target_height (int, optional): Target height for resizing frames. If None, no resizing is done.
target_width (int, optional): Target width for resizing frames. If None, no resizing is done.
Returns:
np.ndarray: Array of frames from the video in RGB format.
"""
# Open the video file
video = cv2.VideoCapture(video_path)
# Get video properties
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
# Read all frames into a numpy array
frames = []
for _ in range(frame_count):
ret, frame = video.read()
if not ret:
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if target_height is not None and target_width is not None:
frame = cv2.resize(frame, (target_width, target_height), interpolation=cv2.INTER_LINEAR)
frames.append(frame)
# Convert to numpy array
frames = np.array(frames).astype(np.uint8)
# Release the video object
video.release()
return frames
def process_video_and_demo(f_in, f_out, video_path, orig_demo_id, new_demo_id):
"""Process a single video and create a new demo with augmented video frames.
Args:
f_in (h5py.File): Input HDF5 file.
f_out (h5py.File): Output HDF5 file.
video_path (str): Path to the augmented video file.
orig_demo_id (int): ID of the original demo to copy.
new_demo_id (int): ID for the new demo.
"""
# Get original demo data
actions = f_in[f"data/demo_{str(orig_demo_id)}/actions"]
eef_pos = f_in[f"data/demo_{str(orig_demo_id)}/obs/eef_pos"]
eef_quat = f_in[f"data/demo_{str(orig_demo_id)}/obs/eef_quat"]
gripper_pos = f_in[f"data/demo_{str(orig_demo_id)}/obs/gripper_pos"]
wrist_cam = f_in[f"data/demo_{str(orig_demo_id)}/obs/wrist_cam"]
# Get original video resolution
orig_video = f_in[f"data/demo_{str(orig_demo_id)}/obs/table_cam"]
target_height, target_width = orig_video.shape[1:3]
# Extract frames from video with original resolution
frames = get_frames_from_mp4(video_path, target_height, target_width)
# Create new datasets
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/actions", data=actions, compression="gzip")
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/eef_pos", data=eef_pos, compression="gzip")
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/eef_quat", data=eef_quat, compression="gzip")
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/gripper_pos", data=gripper_pos, compression="gzip")
f_out.create_dataset(
f"data/demo_{str(new_demo_id)}/obs/table_cam", data=frames.astype(np.uint8), compression="gzip"
)
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/wrist_cam", data=wrist_cam, compression="gzip")
# Copy attributes
f_out[f"data/demo_{str(new_demo_id)}"].attrs["num_samples"] = f_in[f"data/demo_{str(orig_demo_id)}"].attrs[
"num_samples"
]
def main():
"""Main function to create a new dataset with augmented videos."""
# Parse command line arguments
args = parse_args()
# Get list of MP4 videos
search_path = os.path.join(args.videos_dir, "*.mp4")
video_paths = glob.glob(search_path)
video_paths.sort()
print(f"Found {len(video_paths)} MP4 videos in {args.videos_dir}")
# Create output directory if it doesn't exist
os.makedirs(os.path.dirname(args.output_file), exist_ok=True)
with h5py.File(args.input_file, "r") as f_in, h5py.File(args.output_file, "w") as f_out:
# Copy all data from input to output
f_in.copy("data", f_out)
# Get the largest demo ID to start new demos from
demo_ids = [int(key.split("_")[1]) for key in f_in["data"].keys()]
next_demo_id = max(demo_ids) + 1 # noqa: SIM113
print(f"Starting new demos from ID: {next_demo_id}")
# Process each video and create new demo
for video_path in video_paths:
# Extract original demo ID from video filename
video_filename = os.path.basename(video_path)
orig_demo_id = int(video_filename.split("_")[1])
process_video_and_demo(f_in, f_out, video_path, orig_demo_id, next_demo_id)
next_demo_id += 1
print(f"Augmented data saved to {args.output_file}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,90 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Convert all mesh files to `.obj` in given folders."""
import argparse
import os
import shutil
import subprocess
# Constants
# Path to blender
BLENDER_EXE_PATH = shutil.which("blender")
def parse_cli_args():
"""Parse the input command line arguments."""
# add argparse arguments
parser = argparse.ArgumentParser("Utility to convert all mesh files to `.obj` in given folders.")
parser.add_argument("input_dir", type=str, help="The input directory from which to load meshes.")
parser.add_argument(
"-o",
"--output_dir",
type=str,
default=None,
help="The output directory to save converted meshes into. Default is same as input directory.",
)
args_cli = parser.parse_args()
# resolve output directory
if args_cli.output_dir is None:
args_cli.output_dir = args_cli.input_dir
# return arguments
return args_cli
def run_blender_convert2obj(in_file: str, out_file: str):
"""Calls the python script using `subprocess` to perform processing of mesh file.
Args:
in_file: Input mesh file.
out_file: Output obj file.
"""
# resolve for python file
tools_dirname = os.path.dirname(os.path.abspath(__file__))
script_file = os.path.join(tools_dirname, "blender_obj.py")
# complete command
command_exe = f"{BLENDER_EXE_PATH} --background --python {script_file} -- -i {in_file} -o {out_file}"
# break command into list
command_exe_list = command_exe.split(" ")
# run command
subprocess.run(command_exe_list)
def convert_meshes(source_folders: list[str], destination_folders: list[str]):
"""Processes all mesh files of supported format into OBJ file using blender.
Args:
source_folders: List of directories to search for meshes.
destination_folders: List of directories to dump converted files.
"""
# create folder for corresponding destination
for folder in destination_folders:
os.makedirs(folder, exist_ok=True)
# iterate over each folder
for in_folder, out_folder in zip(source_folders, destination_folders):
# extract all dae files in the directory
mesh_filenames = [f for f in os.listdir(in_folder) if f.endswith("dae")]
mesh_filenames += [f for f in os.listdir(in_folder) if f.endswith("stl")]
mesh_filenames += [f for f in os.listdir(in_folder) if f.endswith("STL")]
# print status
print(f"Found {len(mesh_filenames)} files to process in directory: {in_folder}")
# iterate over each OBJ file
for mesh_file in mesh_filenames:
# extract meshname
mesh_name = os.path.splitext(mesh_file)[0]
# complete path of input and output files
in_file_path = os.path.join(in_folder, mesh_file)
out_file_path = os.path.join(out_folder, mesh_name + ".obj")
# perform blender processing
print("Processing: ", in_file_path)
run_blender_convert2obj(in_file_path, out_file_path)
if __name__ == "__main__":
# Parse command line arguments
args = parse_cli_args()
# Run conversion
convert_meshes([args.input_dir], [args.output_dir])

View File

@@ -0,0 +1,593 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to record demonstrations with Isaac Lab environments using human teleoperation.
This script allows users to record demonstrations operated by human teleoperation for a specified task.
The recorded demonstrations are stored as episodes in a hdf5 file. Users can specify the task, teleoperation
device, dataset directory, and environment stepping rate through command-line arguments.
required arguments:
--task Name of the task.
optional arguments:
-h, --help Show this help message and exit
--teleop_device Device for interacting with environment. (default: keyboard)
--dataset_file File path to export recorded demos. (default: "./datasets/dataset.hdf5")
--step_hz Environment stepping rate in Hz. (default: 30)
--num_demos Number of demonstrations to record. (default: 0)
--num_success_steps Number of continuous steps with task success for concluding a demo as successful.
(default: 10)
"""
"""Launch Isaac Sim Simulator first."""
# Standard library imports
import argparse
import contextlib
# Isaac Lab AppLauncher
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Record demonstrations for Isaac Lab environments.")
parser.add_argument("--task", type=str, required=True, help="Name of the task.")
parser.add_argument(
"--teleop_device",
type=str,
default="keyboard",
help=(
"Teleop device. Set here (legacy) or via the environment config. If using the environment config, pass the"
" device key/name defined under 'teleop_devices' (it can be a custom name, not necessarily 'handtracking')."
" Built-ins: keyboard, spacemouse, gamepad. Not all tasks support all built-ins."
),
)
parser.add_argument(
"--dataset_file", type=str, default="./datasets/dataset.hdf5", help="File path to export recorded demos."
)
parser.add_argument("--step_hz", type=int, default=30, help="Environment stepping rate in Hz.")
parser.add_argument(
"--num_demos", type=int, default=0, help="Number of demonstrations to record. Set to 0 for infinite."
)
parser.add_argument(
"--num_success_steps",
type=int,
default=10,
help="Number of continuous steps with task success for concluding a demo as successful. Default is 10.",
)
parser.add_argument(
"--enable_pinocchio",
action="store_true",
default=False,
help="Enable Pinocchio.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# Validate required arguments
if args_cli.task is None:
parser.error("--task is required")
app_launcher_args = vars(args_cli)
if args_cli.enable_pinocchio:
# Import pinocchio before AppLauncher to force the use of the version
# installed by IsaacLab and not the one installed by Isaac Sim.
# pinocchio is required by the Pink IK controllers and the GR1T2 retargeter
import pinocchio # noqa: F401
if "handtracking" in args_cli.teleop_device.lower():
app_launcher_args["xr"] = True
# launch the simulator
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
# Third-party imports
import logging
import os
import sys
import time
import gymnasium as gym
import torch
import omni.ui as ui
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg, Se3SpaceMouse, Se3SpaceMouseCfg
from isaaclab.devices.openxr import remove_camera_configs
from isaaclab.devices.teleop_device_factory import create_teleop_device
import isaaclab_mimic.envs # noqa: F401
from isaaclab_mimic.ui.instruction_display import InstructionDisplay, show_subtask_instructions
if args_cli.enable_pinocchio:
import isaaclab_tasks.manager_based.locomanipulation.pick_place # noqa: F401
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
from collections.abc import Callable
from isaaclab.envs import DirectRLEnvCfg, ManagerBasedRLEnvCfg
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.envs.ui import EmptyWindow
from isaaclab.managers import DatasetExportMode
# Add workspace root to sys.path so mindrobot_keyboard is importable as a package path.
# record_demos.py lives at scripts/tools/, workspace root is three levels up.
_ws_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if _ws_root not in sys.path:
sys.path.insert(0, _ws_root)
from scripts.environments.teleoperation.mindrobot_keyboard import MindRobotCombinedKeyboard # noqa: E402
import isaaclab_tasks # noqa: F401
import mindbot.tasks # noqa: F401 — registers Isaac-MindRobot-* environments
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
# import logger
logger = logging.getLogger(__name__)
class RateLimiter:
"""Convenience class for enforcing rates in loops."""
def __init__(self, hz: int):
"""Initialize a RateLimiter with specified frequency.
Args:
hz: Frequency to enforce in Hertz.
"""
self.hz = hz
self.last_time = time.time()
self.sleep_duration = 1.0 / hz
self.render_period = min(0.033, self.sleep_duration)
def sleep(self, env: gym.Env):
"""Attempt to sleep at the specified rate in hz.
Args:
env: Environment to render during sleep periods.
"""
next_wakeup_time = self.last_time + self.sleep_duration
while time.time() < next_wakeup_time:
time.sleep(self.render_period)
env.sim.render()
self.last_time = self.last_time + self.sleep_duration
# detect time jumping forwards (e.g. loop is too slow)
if self.last_time < time.time():
while self.last_time < time.time():
self.last_time += self.sleep_duration
def setup_output_directories() -> tuple[str, str]:
"""Set up output directories for saving demonstrations.
Creates the output directory if it doesn't exist and extracts the file name
from the dataset file path.
Returns:
tuple[str, str]: A tuple containing:
- output_dir: The directory path where the dataset will be saved
- output_file_name: The filename (without extension) for the dataset
"""
# get directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.dataset_file)
output_file_name = os.path.splitext(os.path.basename(args_cli.dataset_file))[0]
# create directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"Created output directory: {output_dir}")
return output_dir, output_file_name
def create_environment_config(
output_dir: str, output_file_name: str
) -> tuple[ManagerBasedRLEnvCfg | DirectRLEnvCfg, object | None]:
"""Create and configure the environment configuration.
Parses the environment configuration and makes necessary adjustments for demo recording.
Extracts the success termination function and configures the recorder manager.
Args:
output_dir: Directory where recorded demonstrations will be saved
output_file_name: Name of the file to store the demonstrations
Returns:
tuple[isaaclab_tasks.utils.parse_cfg.EnvCfg, Optional[object]]: A tuple containing:
- env_cfg: The configured environment configuration
- success_term: The success termination object or None if not available
Raises:
Exception: If parsing the environment configuration fails
"""
# parse configuration
try:
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=1)
env_cfg.env_name = args_cli.task.split(":")[-1]
except Exception as e:
logger.error(f"Failed to parse environment configuration: {e}")
exit(1)
# extract success checking function to invoke in the main loop
success_term = None
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
else:
logger.warning(
"No success termination term was found in the environment."
" Will not be able to mark recorded demos as successful."
)
if args_cli.xr:
# If cameras are not enabled and XR is enabled, remove camera configs
if not args_cli.enable_cameras:
env_cfg = remove_camera_configs(env_cfg)
env_cfg.sim.render.antialiasing_mode = "DLSS"
# modify configuration such that the environment runs indefinitely until
# the goal is reached or other termination conditions are met
env_cfg.terminations.time_out = None
env_cfg.observations.policy.concatenate_terms = False
env_cfg.recorders: ActionStateRecorderManagerCfg = ActionStateRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = output_dir
env_cfg.recorders.dataset_filename = output_file_name
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY
return env_cfg, success_term
def create_environment(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg) -> gym.Env:
"""Create the environment from the configuration.
Args:
env_cfg: The environment configuration object that defines the environment properties.
This should be an instance of EnvCfg created by parse_env_cfg().
Returns:
gym.Env: A Gymnasium environment instance for the specified task.
Raises:
Exception: If environment creation fails for any reason.
"""
try:
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
return env
except Exception as e:
logger.error(f"Failed to create environment: {e}")
exit(1)
def setup_teleop_device(callbacks: dict[str, Callable]) -> object:
"""Set up the teleoperation device based on configuration.
Attempts to create a teleoperation device based on the environment configuration.
Falls back to default devices if the specified device is not found in the configuration.
Args:
callbacks: Dictionary mapping callback keys to functions that will be
attached to the teleop device
Returns:
object: The configured teleoperation device interface
Raises:
Exception: If teleop device creation fails
"""
teleop_interface = None
try:
if hasattr(env_cfg, "teleop_devices") and args_cli.teleop_device in env_cfg.teleop_devices.devices:
teleop_interface = create_teleop_device(args_cli.teleop_device, env_cfg.teleop_devices.devices, callbacks)
else:
logger.warning(
f"No teleop device '{args_cli.teleop_device}' found in environment config. Creating default."
)
# Create fallback teleop device
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.2, rot_sensitivity=0.5))
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(Se3SpaceMouseCfg(pos_sensitivity=0.2, rot_sensitivity=0.5))
elif args_cli.teleop_device.lower() == "mindrobot_keyboard":
teleop_interface = MindRobotCombinedKeyboard(
pos_sensitivity=0.05,
rot_sensitivity=0.05,
wheel_speed=5.0,
sim_device=args_cli.device,
)
# for key, callback in callbacks.items():
# teleop_interface.add_callback(key, callback)
else:
logger.error(f"Unsupported teleop device: {args_cli.teleop_device}")
logger.error("Supported devices: keyboard, spacemouse, handtracking")
exit(1)
# Add callbacks to fallback device
for key, callback in callbacks.items():
teleop_interface.add_callback(key, callback)
except Exception as e:
logger.error(f"Failed to create teleop device: {e}")
exit(1)
if teleop_interface is None:
logger.error("Failed to create teleop interface")
exit(1)
return teleop_interface
def setup_ui(label_text: str, env: gym.Env) -> InstructionDisplay:
"""Set up the user interface elements.
Creates instruction display and UI window with labels for showing information
to the user during demonstration recording.
Args:
label_text: Text to display showing current recording status
env: The environment instance for which UI is being created
Returns:
InstructionDisplay: The configured instruction display object
"""
instruction_display = InstructionDisplay(args_cli.xr)
if not args_cli.xr:
window = EmptyWindow(env, "Instruction")
with window.ui_window_elements["main_vstack"]:
demo_label = ui.Label(label_text)
subtask_label = ui.Label("")
instruction_display.set_labels(subtask_label, demo_label)
return instruction_display
def process_success_condition(env: gym.Env, success_term: object | None, success_step_count: int) -> tuple[int, bool]:
"""Process the success condition for the current step.
Checks if the environment has met the success condition for the required
number of consecutive steps. Marks the episode as successful if criteria are met.
Args:
env: The environment instance to check
success_term: The success termination object or None if not available
success_step_count: Current count of consecutive successful steps
Returns:
tuple[int, bool]: A tuple containing:
- updated success_step_count: The updated count of consecutive successful steps
- success_reset_needed: Boolean indicating if reset is needed due to success
"""
if success_term is None:
return success_step_count, False
if bool(success_term.func(env, **success_term.params)[0]):
success_step_count += 1
if success_step_count >= args_cli.num_success_steps:
env.recorder_manager.record_pre_reset([0], force_export_or_skip=False)
env.recorder_manager.set_success_to_episodes(
[0], torch.tensor([[True]], dtype=torch.bool, device=env.device)
)
env.recorder_manager.export_episodes([0])
print("Success condition met! Recording completed.")
return success_step_count, True
else:
success_step_count = 0
return success_step_count, False
def handle_reset(
env: gym.Env, success_step_count: int, instruction_display: InstructionDisplay, label_text: str
) -> int:
"""Handle resetting the environment.
Resets the environment, recorder manager, and related state variables.
Updates the instruction display with current status.
Args:
env: The environment instance to reset
success_step_count: Current count of consecutive successful steps
instruction_display: The display object to update
label_text: Text to display showing current recording status
Returns:
int: Reset success step count (0)
"""
print("Resetting environment...")
env.sim.reset()
env.recorder_manager.reset()
env.reset()
success_step_count = 0
instruction_display.show_demo(label_text)
return success_step_count
def run_simulation_loop(
env: gym.Env,
teleop_interface: object | None,
success_term: object | None,
rate_limiter: RateLimiter | None,
) -> int:
"""Run the main simulation loop for collecting demonstrations.
Sets up callback functions for teleop device, initializes the UI,
and runs the main loop that processes user inputs and environment steps.
Records demonstrations when success conditions are met.
Args:
env: The environment instance
teleop_interface: Optional teleop interface (will be created if None)
success_term: The success termination object or None if not available
rate_limiter: Optional rate limiter to control simulation speed
Returns:
int: Number of successful demonstrations recorded
"""
current_recorded_demo_count = 0
success_step_count = 0
should_reset_recording_instance = False
running_recording_instance = not args_cli.xr
# Callback closures for the teleop device
def reset_recording_instance():
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
print("Recording instance reset requested")
def start_recording_instance():
nonlocal running_recording_instance
running_recording_instance = True
print("Recording started")
def stop_recording_instance():
nonlocal running_recording_instance
running_recording_instance = False
print("Recording paused")
def save_success_and_reset():
"""L 键:手动标记当前 episode 为成功并导出,然后重置。"""
nonlocal should_reset_recording_instance
print("[record] Manually marking episode as success and saving...")
env.recorder_manager.record_pre_reset([0], force_export_or_skip=False)
env.recorder_manager.set_success_to_episodes(
[0], torch.tensor([[True]], dtype=torch.bool, device=env.device)
)
env.recorder_manager.export_episodes([0])
should_reset_recording_instance = True
print("[record] Episode saved successfully!")
# Set up teleoperation callbacks
teleoperation_callbacks = {
"R": reset_recording_instance,
"L": save_success_and_reset,
"START": start_recording_instance,
"STOP": stop_recording_instance,
"RESET": reset_recording_instance,
}
teleop_interface = setup_teleop_device(teleoperation_callbacks)
# Reset before starting
env.sim.reset()
env.reset()
teleop_interface.reset()
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
instruction_display = setup_ui(label_text, env)
subtasks = {}
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running():
# Get keyboard command
action = teleop_interface.advance()
# Expand to batch dimension
actions = action.repeat(env.num_envs, 1)
# Perform action on environment
if running_recording_instance:
# Compute actions based on environment
obv = env.step(actions)
if subtasks is not None:
if subtasks == {}:
subtasks = obv[0].get("subtask_terms")
elif subtasks:
show_subtask_instructions(instruction_display, subtasks, obv, env.cfg)
else:
env.sim.render()
# Check for success condition
success_step_count, success_reset_needed = process_success_condition(env, success_term, success_step_count)
if success_reset_needed:
should_reset_recording_instance = True
# Update demo count if it has changed
if env.recorder_manager.exported_successful_episode_count > current_recorded_demo_count:
current_recorded_demo_count = env.recorder_manager.exported_successful_episode_count
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
print(label_text)
# Check if we've reached the desired number of demos
if args_cli.num_demos > 0 and env.recorder_manager.exported_successful_episode_count >= args_cli.num_demos:
label_text = f"All {current_recorded_demo_count} demonstrations recorded.\nExiting the app."
instruction_display.show_demo(label_text)
print(label_text)
target_time = time.time() + 0.8
while time.time() < target_time:
if rate_limiter:
rate_limiter.sleep(env)
else:
env.sim.render()
break
# Handle reset if requested
if should_reset_recording_instance:
success_step_count = handle_reset(env, success_step_count, instruction_display, label_text)
should_reset_recording_instance = False
# Check if simulation is stopped
if env.sim.is_stopped():
break
# Rate limiting
if rate_limiter:
rate_limiter.sleep(env)
return current_recorded_demo_count
def main() -> None:
"""Collect demonstrations from the environment using teleop interfaces.
Main function that orchestrates the entire process:
1. Sets up rate limiting based on configuration
2. Creates output directories for saving demonstrations
3. Configures the environment
4. Runs the simulation loop to collect demonstrations
5. Cleans up resources when done
Raises:
Exception: Propagates exceptions from any of the called functions
"""
# if handtracking is selected, rate limiting is achieved via OpenXR
if args_cli.xr:
rate_limiter = None
from isaaclab.ui.xr_widgets import TeleopVisualizationManager, XRVisualization
# Assign the teleop visualization manager to the visualization system
XRVisualization.assign_manager(TeleopVisualizationManager)
else:
rate_limiter = RateLimiter(args_cli.step_hz)
# Set up output directories
output_dir, output_file_name = setup_output_directories()
# Create and configure environment
global env_cfg # Make env_cfg available to setup_teleop_device
env_cfg, success_term = create_environment_config(output_dir, output_file_name)
# Create environment
env = create_environment(env_cfg)
# Run simulation loop
current_recorded_demo_count = run_simulation_loop(env, None, success_term, rate_limiter)
# Clean up
env.close()
print(f"Recording session completed with {current_recorded_demo_count} successful demonstrations")
print(f"Demonstrations saved to: {args_cli.dataset_file}")
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,315 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to replay demonstrations with Isaac Lab environments."""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Replay demonstrations in Isaac Lab environments.")
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to replay episodes.")
parser.add_argument("--task", type=str, default=None, help="Force to use the specified task.")
parser.add_argument(
"--select_episodes",
type=int,
nargs="+",
default=[],
help="A list of episode indices to be replayed. Keep empty to replay all in the dataset file.",
)
parser.add_argument("--dataset_file", type=str, default="datasets/dataset.hdf5", help="Dataset file to be replayed.")
parser.add_argument(
"--validate_states",
action="store_true",
default=False,
help=(
"Validate if the states, if available, match between loaded from datasets and replayed. Only valid if"
" --num_envs is 1."
),
)
parser.add_argument(
"--validate_success_rate",
action="store_true",
default=False,
help="Validate the replay success rate using the task environment termination criteria",
)
parser.add_argument(
"--enable_pinocchio",
action="store_true",
default=False,
help="Enable Pinocchio.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# args_cli.headless = True
if args_cli.enable_pinocchio:
# Import pinocchio before AppLauncher to force the use of the version
# installed by IsaacLab and not the one installed by Isaac Sim.
# pinocchio is required by the Pink IK controllers and the GR1T2 retargeter
import pinocchio # noqa: F401
# launch the simulator
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import contextlib
import os
import gymnasium as gym
import torch
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg
from isaaclab.utils.datasets import EpisodeData, HDF5DatasetFileHandler
if args_cli.enable_pinocchio:
import isaaclab_tasks.manager_based.locomanipulation.pick_place # noqa: F401
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
import isaaclab_tasks # noqa: F401
import mindbot.tasks # noqa: F401 — registers Isaac-MindRobot-* environments
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
is_paused = False
def play_cb():
global is_paused
is_paused = False
def pause_cb():
global is_paused
is_paused = True
def compare_states(state_from_dataset, runtime_state, runtime_env_index) -> (bool, str):
"""Compare states from dataset and runtime.
Args:
state_from_dataset: State from dataset.
runtime_state: State from runtime.
runtime_env_index: Index of the environment in the runtime states to be compared.
Returns:
bool: True if states match, False otherwise.
str: Log message if states don't match.
"""
states_matched = True
output_log = ""
for asset_type in ["articulation", "rigid_object"]:
for asset_name in runtime_state[asset_type].keys():
for state_name in runtime_state[asset_type][asset_name].keys():
runtime_asset_state = runtime_state[asset_type][asset_name][state_name][runtime_env_index]
dataset_asset_state = state_from_dataset[asset_type][asset_name][state_name]
if len(dataset_asset_state) != len(runtime_asset_state):
raise ValueError(f"State shape of {state_name} for asset {asset_name} don't match")
for i in range(len(dataset_asset_state)):
if abs(dataset_asset_state[i] - runtime_asset_state[i]) > 0.01:
states_matched = False
output_log += f'\tState ["{asset_type}"]["{asset_name}"]["{state_name}"][{i}] don\'t match\r\n'
output_log += f"\t Dataset:\t{dataset_asset_state[i]}\r\n"
output_log += f"\t Runtime: \t{runtime_asset_state[i]}\r\n"
return states_matched, output_log
def main():
"""Replay episodes loaded from a file."""
global is_paused
# Load dataset
if not os.path.exists(args_cli.dataset_file):
raise FileNotFoundError(f"The dataset file {args_cli.dataset_file} does not exist.")
dataset_file_handler = HDF5DatasetFileHandler()
dataset_file_handler.open(args_cli.dataset_file)
env_name = dataset_file_handler.get_env_name()
episode_count = dataset_file_handler.get_num_episodes()
if episode_count == 0:
print("No episodes found in the dataset.")
exit()
episode_indices_to_replay = args_cli.select_episodes
if len(episode_indices_to_replay) == 0:
episode_indices_to_replay = list(range(episode_count))
if args_cli.task is not None:
env_name = args_cli.task.split(":")[-1]
if env_name is None:
raise ValueError("Task/env name was not specified nor found in the dataset.")
num_envs = args_cli.num_envs
env_cfg = parse_env_cfg(env_name, device=args_cli.device, num_envs=num_envs)
# extract success checking function to invoke in the main loop
success_term = None
if args_cli.validate_success_rate:
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
else:
print(
"No success termination term was found in the environment."
" Will not be able to mark recorded demos as successful."
)
# Disable all recorders and terminations
env_cfg.recorders = {}
env_cfg.terminations = {}
# create environment from loaded config
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
teleop_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.1, rot_sensitivity=0.1))
teleop_interface.add_callback("N", play_cb)
teleop_interface.add_callback("B", pause_cb)
print('Press "B" to pause and "N" to resume the replayed actions.')
# Determine if state validation should be conducted
state_validation_enabled = False
if args_cli.validate_states and num_envs == 1:
state_validation_enabled = True
elif args_cli.validate_states and num_envs > 1:
print("Warning: State validation is only supported with a single environment. Skipping state validation.")
# Get idle action (idle actions are applied to envs without next action)
if hasattr(env_cfg, "idle_action"):
idle_action = env_cfg.idle_action.repeat(num_envs, 1)
else:
idle_action = torch.zeros(env.action_space.shape)
# reset before starting
env.reset()
teleop_interface.reset()
# simulate environment -- run everything in inference mode
episode_names = list(dataset_file_handler.get_episode_names())
replayed_episode_count = 0
recorded_episode_count = 0
# Track current episode indices for each environment
current_episode_indices = [None] * num_envs
# Track failed demo IDs
failed_demo_ids = []
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running() and not simulation_app.is_exiting():
env_episode_data_map = {index: EpisodeData() for index in range(num_envs)}
first_loop = True
has_next_action = True
episode_ended = [False] * num_envs
while has_next_action:
# initialize actions with idle action so those without next action will not move
actions = idle_action
has_next_action = False
for env_id in range(num_envs):
env_next_action = env_episode_data_map[env_id].get_next_action()
if env_next_action is None:
# check if the episode is successful after the whole episode_data is
if (
(success_term is not None)
and (current_episode_indices[env_id]) is not None
and (not episode_ended[env_id])
):
if bool(success_term.func(env, **success_term.params)[env_id]):
recorded_episode_count += 1
plural_trailing_s = "s" if recorded_episode_count > 1 else ""
print(
f"Successfully replayed {recorded_episode_count} episode{plural_trailing_s} out"
f" of {replayed_episode_count} demos."
)
else:
# if not successful, add to failed demo IDs list
if (
current_episode_indices[env_id] is not None
and current_episode_indices[env_id] not in failed_demo_ids
):
failed_demo_ids.append(current_episode_indices[env_id])
episode_ended[env_id] = True
next_episode_index = None
while episode_indices_to_replay:
next_episode_index = episode_indices_to_replay.pop(0)
if next_episode_index < episode_count:
episode_ended[env_id] = False
break
next_episode_index = None
if next_episode_index is not None:
replayed_episode_count += 1
current_episode_indices[env_id] = next_episode_index
print(f"{replayed_episode_count:4}: Loading #{next_episode_index} episode to env_{env_id}")
episode_data = dataset_file_handler.load_episode(
episode_names[next_episode_index], env.device
)
env_episode_data_map[env_id] = episode_data
# Set initial state for the new episode
initial_state = episode_data.get_initial_state()
env.reset_to(initial_state, torch.tensor([env_id], device=env.device), is_relative=True)
# Get the first action for the new episode
env_next_action = env_episode_data_map[env_id].get_next_action()
has_next_action = True
else:
continue
else:
has_next_action = True
actions[env_id] = env_next_action
if first_loop:
first_loop = False
else:
while is_paused:
env.sim.render()
continue
env.step(actions)
if state_validation_enabled:
state_from_dataset = env_episode_data_map[0].get_next_state()
if state_from_dataset is not None:
print(
f"Validating states at action-index: {env_episode_data_map[0].next_state_index - 1:4}",
end="",
)
current_runtime_state = env.scene.get_state(is_relative=True)
states_matched, comparison_log = compare_states(state_from_dataset, current_runtime_state, 0)
if states_matched:
print("\t- matched.")
else:
print("\t- mismatched.")
print(comparison_log)
break
# Close environment after replay in complete
plural_trailing_s = "s" if replayed_episode_count > 1 else ""
print(f"Finished replaying {replayed_episode_count} episode{plural_trailing_s}.")
# Print success statistics only if validation was enabled
if success_term is not None:
print(f"Successfully replayed: {recorded_episode_count}/{replayed_episode_count}")
# Print failed demo IDs if any
if failed_demo_ids:
print(f"\nFailed demo IDs ({len(failed_demo_ids)} total):")
print(f" {sorted(failed_demo_ids)}")
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,169 @@
# Copyright (c) 2024-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Test cases for Cosmos prompt generation script."""
import json
import os
import tempfile
import pytest
from scripts.tools.cosmos.cosmos_prompt_gen import generate_prompt, main
@pytest.fixture(scope="class")
def temp_templates_file():
"""Create temporary templates file."""
temp_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False) # noqa: SIM115
# Create test templates
test_templates = {
"lighting": ["with bright lighting", "with dim lighting", "with natural lighting"],
"color": ["in warm colors", "in cool colors", "in vibrant colors"],
"style": ["in a realistic style", "in an artistic style", "in a minimalist style"],
"empty_section": [], # Test empty section
"invalid_section": "not a list", # Test invalid section
}
# Write templates to file
with open(temp_file.name, "w") as f:
json.dump(test_templates, f)
yield temp_file.name
# Cleanup
os.remove(temp_file.name)
@pytest.fixture
def temp_output_file():
"""Create temporary output file."""
temp_file = tempfile.NamedTemporaryFile(suffix=".txt", delete=False) # noqa: SIM115
yield temp_file.name
# Cleanup
os.remove(temp_file.name)
class TestCosmosPromptGen:
"""Test cases for Cosmos prompt generation functionality."""
def test_generate_prompt_valid_templates(self, temp_templates_file):
"""Test generating a prompt with valid templates."""
prompt = generate_prompt(temp_templates_file)
# Check that prompt is a string
assert isinstance(prompt, str)
# Check that prompt contains at least one word
assert len(prompt.split()) > 0
# Check that prompt contains valid sections
valid_sections = ["lighting", "color", "style"]
found_sections = [section for section in valid_sections if section in prompt.lower()]
assert len(found_sections) > 0
def test_generate_prompt_invalid_file(self):
"""Test generating a prompt with invalid file path."""
with pytest.raises(FileNotFoundError):
generate_prompt("nonexistent_file.json")
def test_generate_prompt_invalid_json(self):
"""Test generating a prompt with invalid JSON file."""
# Create a temporary file with invalid JSON
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as temp_file:
temp_file.write(b"invalid json content")
temp_file.flush()
try:
with pytest.raises(ValueError):
generate_prompt(temp_file.name)
finally:
os.remove(temp_file.name)
def test_main_function_single_prompt(self, temp_templates_file, temp_output_file):
"""Test main function with single prompt generation."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"cosmos_prompt_gen.py",
"--templates_path",
temp_templates_file,
"--num_prompts",
"1",
"--output_path",
temp_output_file,
]
try:
main()
# Check if output file was created
assert os.path.exists(temp_output_file)
# Check content of output file
with open(temp_output_file) as f:
content = f.read().strip()
assert len(content) > 0
assert len(content.split("\n")) == 1
finally:
# Restore original argv
sys.argv = original_argv
def test_main_function_multiple_prompts(self, temp_templates_file, temp_output_file):
"""Test main function with multiple prompt generation."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"cosmos_prompt_gen.py",
"--templates_path",
temp_templates_file,
"--num_prompts",
"3",
"--output_path",
temp_output_file,
]
try:
main()
# Check if output file was created
assert os.path.exists(temp_output_file)
# Check content of output file
with open(temp_output_file) as f:
content = f.read().strip()
assert len(content) > 0
assert len(content.split("\n")) == 3
# Check that each line is a valid prompt
for line in content.split("\n"):
assert len(line) > 0
finally:
# Restore original argv
sys.argv = original_argv
def test_main_function_default_output(self, temp_templates_file):
"""Test main function with default output path."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = ["cosmos_prompt_gen.py", "--templates_path", temp_templates_file, "--num_prompts", "1"]
try:
main()
# Check if default output file was created
assert os.path.exists("prompts.txt")
# Clean up default output file
os.remove("prompts.txt")
finally:
# Restore original argv
sys.argv = original_argv

View File

@@ -0,0 +1,173 @@
# Copyright (c) 2024-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Test cases for HDF5 to MP4 conversion script."""
import os
import tempfile
import h5py
import numpy as np
import pytest
from scripts.tools.hdf5_to_mp4 import get_num_demos, main, write_demo_to_mp4
@pytest.fixture(scope="class")
def temp_hdf5_file():
"""Create temporary HDF5 file with test data."""
temp_file = tempfile.NamedTemporaryFile(suffix=".h5", delete=False) # noqa: SIM115
with h5py.File(temp_file.name, "w") as h5f:
# Create test data structure
for demo_id in range(2): # Create 2 demos
demo_group = h5f.create_group(f"data/demo_{demo_id}/obs")
# Create RGB frames (2 frames per demo)
rgb_data = np.random.randint(0, 255, (2, 704, 1280, 3), dtype=np.uint8)
demo_group.create_dataset("table_cam", data=rgb_data)
# Create segmentation frames
seg_data = np.random.randint(0, 255, (2, 704, 1280, 4), dtype=np.uint8)
demo_group.create_dataset("table_cam_segmentation", data=seg_data)
# Create normal maps
normals_data = np.random.rand(2, 704, 1280, 3).astype(np.float32)
demo_group.create_dataset("table_cam_normals", data=normals_data)
# Create depth maps
depth_data = np.random.rand(2, 704, 1280, 1).astype(np.float32)
demo_group.create_dataset("table_cam_depth", data=depth_data)
yield temp_file.name
# Cleanup
os.remove(temp_file.name)
@pytest.fixture
def temp_output_dir():
"""Create temporary output directory."""
temp_dir = tempfile.mkdtemp() # noqa: SIM115
yield temp_dir
# Cleanup
for file in os.listdir(temp_dir):
os.remove(os.path.join(temp_dir, file))
os.rmdir(temp_dir)
class TestHDF5ToMP4:
"""Test cases for HDF5 to MP4 conversion functionality."""
def test_get_num_demos(self, temp_hdf5_file):
"""Test the get_num_demos function."""
num_demos = get_num_demos(temp_hdf5_file)
assert num_demos == 2
def test_write_demo_to_mp4_rgb(self, temp_hdf5_file, temp_output_dir):
"""Test writing RGB frames to MP4."""
write_demo_to_mp4(temp_hdf5_file, 0, "data/demo_0/obs", "table_cam", temp_output_dir, 704, 1280)
output_file = os.path.join(temp_output_dir, "demo_0_table_cam.mp4")
assert os.path.exists(output_file)
assert os.path.getsize(output_file) > 0
def test_write_demo_to_mp4_segmentation(self, temp_hdf5_file, temp_output_dir):
"""Test writing segmentation frames to MP4."""
write_demo_to_mp4(temp_hdf5_file, 0, "data/demo_0/obs", "table_cam_segmentation", temp_output_dir, 704, 1280)
output_file = os.path.join(temp_output_dir, "demo_0_table_cam_segmentation.mp4")
assert os.path.exists(output_file)
assert os.path.getsize(output_file) > 0
def test_write_demo_to_mp4_normals(self, temp_hdf5_file, temp_output_dir):
"""Test writing normal maps to MP4."""
write_demo_to_mp4(temp_hdf5_file, 0, "data/demo_0/obs", "table_cam_normals", temp_output_dir, 704, 1280)
output_file = os.path.join(temp_output_dir, "demo_0_table_cam_normals.mp4")
assert os.path.exists(output_file)
assert os.path.getsize(output_file) > 0
def test_write_demo_to_mp4_shaded_segmentation(self, temp_hdf5_file, temp_output_dir):
"""Test writing shaded_segmentation frames to MP4."""
write_demo_to_mp4(
temp_hdf5_file,
0,
"data/demo_0/obs",
"table_cam_shaded_segmentation",
temp_output_dir,
704,
1280,
)
output_file = os.path.join(temp_output_dir, "demo_0_table_cam_shaded_segmentation.mp4")
assert os.path.exists(output_file)
assert os.path.getsize(output_file) > 0
def test_write_demo_to_mp4_depth(self, temp_hdf5_file, temp_output_dir):
"""Test writing depth maps to MP4."""
write_demo_to_mp4(temp_hdf5_file, 0, "data/demo_0/obs", "table_cam_depth", temp_output_dir, 704, 1280)
output_file = os.path.join(temp_output_dir, "demo_0_table_cam_depth.mp4")
assert os.path.exists(output_file)
assert os.path.getsize(output_file) > 0
def test_write_demo_to_mp4_invalid_demo(self, temp_hdf5_file, temp_output_dir):
"""Test writing with invalid demo ID."""
with pytest.raises(KeyError):
write_demo_to_mp4(
temp_hdf5_file,
999, # Invalid demo ID
"data/demo_999/obs",
"table_cam",
temp_output_dir,
704,
1280,
)
def test_write_demo_to_mp4_invalid_key(self, temp_hdf5_file, temp_output_dir):
"""Test writing with invalid input key."""
with pytest.raises(KeyError):
write_demo_to_mp4(temp_hdf5_file, 0, "data/demo_0/obs", "invalid_key", temp_output_dir, 704, 1280)
def test_main_function(self, temp_hdf5_file, temp_output_dir):
"""Test the main function."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"hdf5_to_mp4.py",
"--input_file",
temp_hdf5_file,
"--output_dir",
temp_output_dir,
"--input_keys",
"table_cam",
"table_cam_segmentation",
"--video_height",
"704",
"--video_width",
"1280",
"--framerate",
"30",
]
try:
main()
# Check if output files were created
expected_files = [
"demo_0_table_cam.mp4",
"demo_0_table_cam_segmentation.mp4",
"demo_1_table_cam.mp4",
"demo_1_table_cam_segmentation.mp4",
]
for file in expected_files:
output_file = os.path.join(temp_output_dir, file)
assert os.path.exists(output_file)
assert os.path.getsize(output_file) > 0
finally:
# Restore original argv
sys.argv = original_argv

View File

@@ -0,0 +1,181 @@
# Copyright (c) 2024-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Test cases for MP4 to HDF5 conversion script."""
import os
import tempfile
import cv2
import h5py
import numpy as np
import pytest
from scripts.tools.mp4_to_hdf5 import get_frames_from_mp4, main, process_video_and_demo
@pytest.fixture(scope="class")
def temp_hdf5_file():
"""Create temporary HDF5 file with test data."""
temp_file = tempfile.NamedTemporaryFile(suffix=".h5", delete=False) # noqa: SIM115
with h5py.File(temp_file.name, "w") as h5f:
# Create test data structure for 2 demos
for demo_id in range(2):
demo_group = h5f.create_group(f"data/demo_{demo_id}")
obs_group = demo_group.create_group("obs")
# Create actions data
actions_data = np.random.rand(10, 7).astype(np.float32)
demo_group.create_dataset("actions", data=actions_data)
# Create robot state data
eef_pos_data = np.random.rand(10, 3).astype(np.float32)
eef_quat_data = np.random.rand(10, 4).astype(np.float32)
gripper_pos_data = np.random.rand(10, 1).astype(np.float32)
obs_group.create_dataset("eef_pos", data=eef_pos_data)
obs_group.create_dataset("eef_quat", data=eef_quat_data)
obs_group.create_dataset("gripper_pos", data=gripper_pos_data)
# Create camera data
table_cam_data = np.random.randint(0, 255, (10, 704, 1280, 3), dtype=np.uint8)
wrist_cam_data = np.random.randint(0, 255, (10, 704, 1280, 3), dtype=np.uint8)
obs_group.create_dataset("table_cam", data=table_cam_data)
obs_group.create_dataset("wrist_cam", data=wrist_cam_data)
# Set attributes
demo_group.attrs["num_samples"] = 10
yield temp_file.name
# Cleanup
os.remove(temp_file.name)
@pytest.fixture(scope="class")
def temp_videos_dir():
"""Create temporary MP4 files."""
temp_dir = tempfile.mkdtemp() # noqa: SIM115
video_paths = []
for demo_id in range(2):
video_path = os.path.join(temp_dir, f"demo_{demo_id}_table_cam.mp4")
video_paths.append(video_path)
# Create a test video
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
video = cv2.VideoWriter(video_path, fourcc, 30, (1280, 704))
# Write some random frames
for _ in range(10):
frame = np.random.randint(0, 255, (704, 1280, 3), dtype=np.uint8)
video.write(frame)
video.release()
yield temp_dir, video_paths
# Cleanup
for video_path in video_paths:
os.remove(video_path)
os.rmdir(temp_dir)
@pytest.fixture
def temp_output_file():
"""Create temporary output file."""
temp_file = tempfile.NamedTemporaryFile(suffix=".h5", delete=False) # noqa: SIM115
yield temp_file.name
# Cleanup
os.remove(temp_file.name)
class TestMP4ToHDF5:
"""Test cases for MP4 to HDF5 conversion functionality."""
def test_get_frames_from_mp4(self, temp_videos_dir):
"""Test extracting frames from MP4 video."""
_, video_paths = temp_videos_dir
frames = get_frames_from_mp4(video_paths[0])
# Check frame properties
assert frames.shape[0] == 10 # Number of frames
assert frames.shape[1:] == (704, 1280, 3) # Frame dimensions
assert frames.dtype == np.uint8 # Data type
def test_get_frames_from_mp4_resize(self, temp_videos_dir):
"""Test extracting frames with resizing."""
_, video_paths = temp_videos_dir
target_height, target_width = 352, 640
frames = get_frames_from_mp4(video_paths[0], target_height, target_width)
# Check resized frame properties
assert frames.shape[0] == 10 # Number of frames
assert frames.shape[1:] == (target_height, target_width, 3) # Resized dimensions
assert frames.dtype == np.uint8 # Data type
def test_process_video_and_demo(self, temp_hdf5_file, temp_videos_dir, temp_output_file):
"""Test processing a single video and creating a new demo."""
_, video_paths = temp_videos_dir
with h5py.File(temp_hdf5_file, "r") as f_in, h5py.File(temp_output_file, "w") as f_out:
process_video_and_demo(f_in, f_out, video_paths[0], 0, 2)
# Check if new demo was created with correct data
assert "data/demo_2" in f_out
assert "data/demo_2/actions" in f_out
assert "data/demo_2/obs/eef_pos" in f_out
assert "data/demo_2/obs/eef_quat" in f_out
assert "data/demo_2/obs/gripper_pos" in f_out
assert "data/demo_2/obs/table_cam" in f_out
assert "data/demo_2/obs/wrist_cam" in f_out
# Check data shapes
assert f_out["data/demo_2/actions"].shape == (10, 7)
assert f_out["data/demo_2/obs/eef_pos"].shape == (10, 3)
assert f_out["data/demo_2/obs/eef_quat"].shape == (10, 4)
assert f_out["data/demo_2/obs/gripper_pos"].shape == (10, 1)
assert f_out["data/demo_2/obs/table_cam"].shape == (10, 704, 1280, 3)
assert f_out["data/demo_2/obs/wrist_cam"].shape == (10, 704, 1280, 3)
# Check attributes
assert f_out["data/demo_2"].attrs["num_samples"] == 10
def test_main_function(self, temp_hdf5_file, temp_videos_dir, temp_output_file):
"""Test the main function."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"mp4_to_hdf5.py",
"--input_file",
temp_hdf5_file,
"--videos_dir",
temp_videos_dir[0],
"--output_file",
temp_output_file,
]
try:
main()
# Check if output file was created with correct data
with h5py.File(temp_output_file, "r") as f:
# Check if original demos were copied
assert "data/demo_0" in f
assert "data/demo_1" in f
# Check if new demos were created
assert "data/demo_2" in f
assert "data/demo_3" in f
# Check data in new demos
for demo_id in [2, 3]:
assert f"data/demo_{demo_id}/actions" in f
assert f"data/demo_{demo_id}/obs/eef_pos" in f
assert f"data/demo_{demo_id}/obs/eef_quat" in f
assert f"data/demo_{demo_id}/obs/gripper_pos" in f
assert f"data/demo_{demo_id}/obs/table_cam" in f
assert f"data/demo_{demo_id}/obs/wrist_cam" in f
finally:
# Restore original argv
sys.argv = original_argv

View File

@@ -0,0 +1,414 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to manage pretrained checkpoints for Isaac Lab environments.
This script is used to train and publish pretrained checkpoints for Isaac Lab environments.
It supports multiple workflows: rl_games, rsl_rl, sb3, and skrl.
* To train an agent using the rl_games workflow on the Isaac-Cartpole-v0 environment:
.. code-block:: shell
python scripts/tools/train_and_publish_checkpoints.py --train rl_games:Isaac-Cartpole-v0
* To train and publish the checkpoints for all workflows on only the direct Cartpole environments:
.. code-block:: shell
python scripts/tools/train_and_publish_checkpoints.py \
-tp "*:Isaac-Cartpole-*Direct-v0" \
--/persistent/isaaclab/asset_root/pretrained_checkpoints="/some/path"
* To review all repose cube jobs, excluding the 'Play' tasks and 'skrl' workflows:
.. code-block:: shell
python scripts/tools/train_and_publish_checkpoints.py \
-r "*:*Repose-Cube*" \
--exclude "*:*Play*" \
--exclude skrl:*
* To publish all results (that have been reviewed and approved).
.. code-block:: shell
python scripts/tools/train_and_publish_checkpoints.py \
--publish --all \
--/persistent/isaaclab/asset_root/pretrained_checkpoints="/some/path"
"""
import argparse
from isaaclab.app import AppLauncher
# Initialize the parser
parser = argparse.ArgumentParser(
description="""
Script for training and publishing pre-trained checkpoints in Isaac Lab.
Examples:
# Train an agent using the rl_games workflow for the Isaac-Cartpole-v0 environment.
train_and_publish_checkpoints.py --train rl_games:Isaac-Cartpole-v0
# Train and publish checkpoints for all workflows, targeting only direct Cartpole environments.
train_and_publish_checkpoints.py -tp "*:Isaac-Cartpole-*Direct-v0" \\
--/persistent/isaaclab/asset_root/pretrained_checkpoints="/some/path"
# Review all Repose Cube jobs, excluding Play tasks and skrl jobs.
train_and_publish_checkpoints.py -r "*:*Repose-Cube*" --exclude "*:*Play*" --exclude skrl:*
# Publish all results that have been reviewed and approved.
train_and_publish_checkpoints.py --publish --all \\
--/persistent/isaaclab/asset_root/pretrained_checkpoints="/some/path"
""",
formatter_class=argparse.RawTextHelpFormatter,
)
# Add positional arguments that can accept zero or more values
parser.add_argument(
"jobs",
nargs="*",
help="""
A job consists of a workflow and a task name, separated by a colon (wildcards are optional). Examples:
rl_games:Isaac-Humanoid-*v0 # Wildcard for any Humanoid version
rsl_rl:Isaac-Ant-*-v0 # Wildcard for any Ant environment
*:Isaac-Velocity-Flat-Spot-v0 # Wildcard for any workflow, specific task
Wildcards can be used in either the workflow or task name to match multiple entries.
""",
)
parser.add_argument("-t", "--train", action="store_true", help="Train checkpoints for later publishing.")
parser.add_argument("-p", "--publish_checkpoint", action="store_true", help="Publish pre-trained checkpoints.")
parser.add_argument("-r", "--review", action="store_true", help="Review checkpoints.")
parser.add_argument("-l", "--list", action="store_true", help="List all available environments and workflows.")
parser.add_argument("-f", "--force", action="store_true", help="Force training when results already exist.")
parser.add_argument("-a", "--all", action="store_true", help="Run all valid workflow task pairs.")
parser.add_argument(
"-E",
"--exclude",
action="append",
type=str,
default=[],
help="Excludes jobs matching the argument, with wildcard support.",
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
parser.add_argument("--force_review", action="store_true", help="Forces review when one already exists.")
parser.add_argument("--force_publish", action="store_true", help="Publish checkpoints without review.")
parser.add_argument("--headless", action="store_true", help="Run training without the UI.")
args, _ = parser.parse_known_args()
# Need something to do
if len(args.jobs) == 0 and not args.all:
parser.error("Jobs must be provided, or --all.")
# Must train, publish, review or list
if not (args.train or args.publish_checkpoint or args.review or args.list):
parser.error("A train, publish, review or list flag must be given.")
# List excludes train and publish
if args.list and (args.train or args.publish_checkpoint):
parser.error("Can't train or publish when listing.")
# launch omniverse app
app_launcher = AppLauncher(headless=True)
simulation_app = app_launcher.app
import csv
# Now everything else
import fnmatch
import json
import os
import subprocess
import sys
import gymnasium as gym
import numpy as np
import omni.client
from omni.client._omniclient import CopyBehavior
from isaaclab_rl.utils.pretrained_checkpoint import (
WORKFLOW_EXPERIMENT_NAME_VARIABLE,
WORKFLOW_PLAYER,
WORKFLOW_TRAINER,
WORKFLOWS,
get_log_root_path,
get_pretrained_checkpoint_path,
get_pretrained_checkpoint_publish_path,
get_pretrained_checkpoint_review,
get_pretrained_checkpoint_review_path,
has_pretrained_checkpoint_job_finished,
has_pretrained_checkpoint_job_run,
has_pretrained_checkpoints_asset_root_dir,
)
# Need somewhere to publish
if args.publish_checkpoint and not has_pretrained_checkpoints_asset_root_dir():
raise Exception("A /persistent/isaaclab/asset_root/pretrained_checkpoints setting is required to publish.")
def train_job(workflow, task_name, headless=False, force=False, num_envs=None):
"""
This trains a task using the workflow's train.py script, overriding the experiment name to ensure unique
log directories. By default it will return if an experiment has already been run.
Args:
workflow: The workflow.
task_name: The task name.
headless: Should the training run without the UI.
force: Run training even if previous experiments have been run.
num_envs: How many simultaneous environments to simulate, overriding the config.
"""
log_root_path = get_log_root_path(workflow, task_name)
# We already ran this
if not force and os.path.exists(log_root_path) and len(os.listdir(log_root_path)) > 0:
print(f"Skipping training of {workflow}:{task_name}, already has been run")
return
print(f"Training {workflow}:{task_name}")
# Construct our command
cmd = [
sys.executable,
WORKFLOW_TRAINER[workflow],
"--task",
task_name,
"--enable_cameras",
]
# Changes the directory name for logging
if WORKFLOW_EXPERIMENT_NAME_VARIABLE[workflow]:
cmd.append(f"{WORKFLOW_EXPERIMENT_NAME_VARIABLE[workflow]}={task_name}")
if headless:
cmd.append("--headless")
if num_envs:
cmd.extend(["--num_envs", str(num_envs)])
print("Running : " + " ".join(cmd))
subprocess.run(cmd)
def review_pretrained_checkpoint(workflow, task_name, force_review=False, num_envs=None):
"""
This initiates a review of the pretrained checkpoint. The play.py script for the workflow is run, and the user
inspects the results. When done they close the simulator and will be prompted for their review.
Args:
workflow: The workflow.
task_name: The task name.
force_review: Performs the review even if a review already exists.
num_envs: How many simultaneous environments to simulate, overriding the config.
"""
# This workflow task pair hasn't been trained
if not has_pretrained_checkpoint_job_run(workflow, task_name):
print(f"Skipping review of {workflow}:{task_name}, hasn't been trained yet")
return
# Couldn't find the checkpoint
if not has_pretrained_checkpoint_job_finished(workflow, task_name):
print(f"Training not complete for {workflow}:{task_name}")
return
review = get_pretrained_checkpoint_review(workflow, task_name)
if not force_review and review and review["reviewed"]:
print(f"Review already complete for {workflow}:{task_name}")
return
print(f"Reviewing {workflow}:{task_name}")
# Construct our command
cmd = [
sys.executable,
WORKFLOW_PLAYER[workflow],
"--task",
task_name,
"--checkpoint",
get_pretrained_checkpoint_path(workflow, task_name),
"--enable_cameras",
]
if num_envs:
cmd.extend(["--num_envs", str(num_envs)])
print("Running : " + " ".join(cmd))
subprocess.run(cmd)
# Give user a chance to leave the old review
if force_review and review and review["reviewed"]:
result = review["result"]
notes = review.get("notes")
print(f"A review already exists for {workflow}:{task_name}, it was marked as '{result}'.")
print(f" Notes: {notes}")
answer = input("Would you like to replace it? Please answer yes or no (y/n) [n]: ").strip().lower()
if answer != "y":
return
# Get the verdict from the user
print(f"Do you accept this checkpoint for {workflow}:{task_name}?")
answer = input("Please answer yes, no or undetermined (y/n/u) [u]: ").strip().lower()
if answer not in {"y", "n", "u"}:
answer = "u"
answer_map = {
"y": "accepted",
"n": "rejected",
"u": "undetermined",
}
# Create the review dict
review = {
"reviewed": True,
"result": answer_map[answer],
}
# Maybe add some notes
notes = input("Please add notes or hit enter: ").strip().lower()
if notes:
review["notes"] = notes
# Save the review JSON file
path = get_pretrained_checkpoint_review_path(workflow, task_name)
if not path:
raise Exception("This shouldn't be possible, something went very wrong.")
with open(path, "w") as f:
json.dump(review, f, indent=4)
def publish_pretrained_checkpoint(workflow, task_name, force_publish=False):
"""
This publishes the pretrained checkpoint to Nucleus using the asset path in the
/persistent/isaaclab/asset_root/pretrained_checkpoints Carb variable.
Args:
workflow: The workflow.
task_name: The task name.
force_publish: Publish without review.
"""
# This workflow task pair hasn't been trained
if not has_pretrained_checkpoint_job_run(workflow, task_name):
print(f"Skipping publishing of {workflow}:{task_name}, hasn't been trained yet")
return
# Couldn't find the checkpoint
if not has_pretrained_checkpoint_job_finished(workflow, task_name):
print(f"Training not complete for {workflow}:{task_name}")
return
# Get local pretrained checkpoint path
local_path = get_pretrained_checkpoint_path(workflow, task_name)
if not local_path:
raise Exception("This shouldn't be possible, something went very wrong.")
# Not forcing, need to check review results
if not force_publish:
# Grab the review if it exists
review = get_pretrained_checkpoint_review(workflow, task_name)
if not review or not review["reviewed"]:
print(f"Skipping publishing of {workflow}:{task_name}, hasn't been reviewed yet")
return
result = review["result"]
if result != "accepted":
print(f'Skipping publishing of {workflow}:{task_name}, review result was "{result}"')
return
print(f"Publishing {workflow}:{task_name}")
# Copy the file
publish_path = get_pretrained_checkpoint_publish_path(workflow, task_name)
omni.client.copy_file(local_path, publish_path, CopyBehavior.OVERWRITE)
def get_job_summary_row(workflow, task_name):
"""Returns a single row summary of the job"""
has_run = has_pretrained_checkpoint_job_run(workflow, task_name)
has_finished = has_pretrained_checkpoint_job_finished(workflow, task_name)
review = get_pretrained_checkpoint_review(workflow, task_name)
if review:
result = review.get("result", "undetermined")
notes = review.get("notes", "")
else:
result = ""
notes = ""
return [workflow, task_name, has_run, has_finished, result, notes]
def main():
# Figure out what workflows and tasks we'll be using
if args.all:
jobs = ["*:*"]
else:
jobs = args.jobs
if args.list:
print()
print("# Workflow, Task, Ran, Finished, Review, Notes")
summary_rows = []
# Could be implemented more efficiently, but the performance gain would be inconsequential
for workflow in WORKFLOWS:
for task_spec in sorted(gym.registry.values(), key=lambda t: t.id):
job_id = f"{workflow}:{task_spec.id}"
# We've excluded this job
if any(fnmatch.fnmatch(job_id, e) for e in args.exclude):
continue
# None of our jobs match this pair
if not np.any(np.array([fnmatch.fnmatch(job_id, job) for job in jobs])):
continue
# No config for this workflow
if workflow + "_cfg_entry_point" not in task_spec.kwargs:
continue
if args.list:
summary_rows.append(get_job_summary_row(workflow, task_spec.id))
continue
# Training reviewing and publishing
if args.train:
train_job(workflow, task_spec.id, args.headless, args.force, args.num_envs)
if args.review:
review_pretrained_checkpoint(workflow, task_spec.id, args.force_review, args.num_envs)
if args.publish_checkpoint:
publish_pretrained_checkpoint(workflow, task_spec.id, args.force_publish)
if args.list:
writer = csv.writer(sys.stdout, quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(summary_rows)
if __name__ == "__main__":
try:
# Run the main function
main()
except Exception as e:
raise e
finally:
# Close the app
simulation_app.close()