Files
mindbot/scripts/environments/teleoperation/teleop_se3_agent.py
2026-03-15 20:23:21 +08:00

372 lines
12 KiB
Python

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to run teleoperation with Isaac Lab manipulation environments.
Supports multiple input devices (e.g., keyboard, spacemouse, gamepad) and devices
configured within the environment (including OpenXR-based hand tracking or motion
controllers)."""
"""Launch Isaac Sim Simulator first."""
import argparse
from collections.abc import Callable
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(
description="Teleoperation for Isaac Lab environments."
)
parser.add_argument(
"--num_envs", type=int, default=1, help="Number of environments to simulate."
)
parser.add_argument(
"--teleop_device",
type=str,
default="keyboard",
help=(
"Teleop device. Set here (legacy) or via the environment config. If using the environment config, pass the"
" device key/name defined under 'teleop_devices' (it can be a custom name, not necessarily 'handtracking')."
" Built-ins: keyboard, spacemouse, gamepad. Not all tasks support all built-ins."
),
)
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument(
"--sensitivity", type=float, default=1.0, help="Sensitivity factor."
)
parser.add_argument(
"--enable_pinocchio",
action="store_true",
default=False,
help="Enable Pinocchio.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
app_launcher_args = vars(args_cli)
if args_cli.enable_pinocchio:
# Import pinocchio before AppLauncher to force the use of the version installed by IsaacLab and
# not the one installed by Isaac Sim pinocchio is required by the Pink IK controllers and the
# GR1T2 retargeter
import pinocchio # noqa: F401
if "handtracking" in args_cli.teleop_device.lower():
app_launcher_args["xr"] = True
# launch omniverse app
app_launcher = AppLauncher(app_launcher_args)
simulation_app = app_launcher.app
"""Rest everything follows."""
import logging
import gymnasium as gym
import torch
from isaaclab.devices import (
Se3Gamepad,
Se3GamepadCfg,
Se3Keyboard,
Se3KeyboardCfg,
Se3SpaceMouse,
Se3SpaceMouseCfg,
)
from isaaclab.devices.openxr import remove_camera_configs
from isaaclab.devices.teleop_device_factory import create_teleop_device
from isaaclab.envs import ManagerBasedRLEnvCfg
from isaaclab.managers import TerminationTermCfg as DoneTerm
import isaaclab_tasks # noqa: F401
import mindbot.tasks # noqa: F401
from isaaclab_tasks.manager_based.manipulation.lift import mdp
from isaaclab_tasks.utils import parse_env_cfg
if args_cli.enable_pinocchio:
import isaaclab_tasks.manager_based.locomanipulation.pick_place # noqa: F401
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
# =====================================================================
# Robot Camera Viewport Utilities (pure GUI — no --enable_cameras needed)
# =====================================================================
# Resolved prim paths for env_0.
# These match the camera prims embedded in mindrobot_cd2.usd.
# Run with the diagnostic block once to confirm exact paths.
_ROBOT_CAM_PRIMS: dict[str, str] = {
"Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand",
"Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand",
"Head": "/World/envs/env_0/Robot/robot_head/cam_head",
"Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest",
}
_robot_viewports: dict[str, object] = {}
def create_robot_viewports() -> None:
"""Create 4 viewport windows bound to robot camera prims (GUI only)."""
try:
import omni.kit.viewport.utility as vp_util
except ImportError:
logger.warning("[Viewport] omni.kit.viewport.utility not available.")
return
for name, cam_path in _ROBOT_CAM_PRIMS.items():
vp_win = vp_util.create_viewport_window(
f"Robot {name} View", width=640, height=360
)
vp_win.viewport_api.camera_path = cam_path
_robot_viewports[name] = vp_win
print(f"[INFO] Viewport 'Robot {name} View'{cam_path}")
def rebind_robot_viewports() -> None:
"""Re-bind viewports to camera paths after env reset."""
for name, vp_win in _robot_viewports.items():
vp_win.viewport_api.camera_path = _ROBOT_CAM_PRIMS[name]
# import logger
logger = logging.getLogger(__name__)
def main() -> None:
"""
Run teleoperation with an Isaac Lab manipulation environment.
Creates the environment, sets up teleoperation interfaces and callbacks,
and runs the main simulation loop until the application is closed.
Returns:
None
"""
# parse configuration
env_cfg = parse_env_cfg(
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs
)
env_cfg.env_name = args_cli.task
if not isinstance(env_cfg, ManagerBasedRLEnvCfg):
raise ValueError(
"Teleoperation is only supported for ManagerBasedRLEnv environments. "
f"Received environment config type: {type(env_cfg).__name__}"
)
# modify configuration
env_cfg.terminations.time_out = None
if "Lift" in args_cli.task:
# set the resampling time range to large number to avoid resampling
env_cfg.commands.object_pose.resampling_time_range = (1.0e9, 1.0e9)
# add termination condition for reaching the goal otherwise the environment won't reset
env_cfg.terminations.object_reached_goal = DoneTerm(
func=mdp.object_reached_goal
)
if args_cli.xr:
env_cfg = remove_camera_configs(env_cfg)
env_cfg.sim.render.antialiasing_mode = "DLSS"
try:
# create environment
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
# check environment name (for reach , we don't allow the gripper)
if "Reach" in args_cli.task:
logger.warning(
f"The environment '{args_cli.task}' does not support gripper control. The device command will be"
" ignored."
)
except Exception as e:
logger.error(f"Failed to create environment: {e}")
simulation_app.close()
return
# Flags for controlling teleoperation flow
should_reset_recording_instance = False
teleoperation_active = True
# Callback handlers
def reset_recording_instance() -> None:
"""
Reset the environment to its initial state.
Sets a flag to reset the environment on the next simulation step.
Returns:
None
"""
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
print("Reset triggered - Environment will reset on next step")
def start_teleoperation() -> None:
"""
Activate teleoperation control of the robot.
Enables the application of teleoperation commands to the environment.
Returns:
None
"""
nonlocal teleoperation_active
teleoperation_active = True
print("Teleoperation activated")
def stop_teleoperation() -> None:
"""
Deactivate teleoperation control of the robot.
Disables the application of teleoperation commands to the environment.
Returns:
None
"""
nonlocal teleoperation_active
teleoperation_active = False
print("Teleoperation deactivated")
# Create device config if not already in env_cfg
teleoperation_callbacks: dict[str, Callable[[], None]] = {
"R": reset_recording_instance,
"START": start_teleoperation,
"STOP": stop_teleoperation,
"RESET": reset_recording_instance,
}
# For hand tracking devices, add additional callbacks
if args_cli.xr:
# Default to inactive for hand tracking
teleoperation_active = False
else:
# Always active for other devices
teleoperation_active = True
# Create teleop device from config if present, otherwise create manually
teleop_interface = None
try:
if (
hasattr(env_cfg, "teleop_devices")
and args_cli.teleop_device in env_cfg.teleop_devices.devices
):
teleop_interface = create_teleop_device(
args_cli.teleop_device,
env_cfg.teleop_devices.devices,
teleoperation_callbacks,
)
else:
logger.warning(
f"No teleop device '{args_cli.teleop_device}' found in environment config. Creating default."
)
# Create fallback teleop device
sensitivity = args_cli.sensitivity
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(
Se3KeyboardCfg(
pos_sensitivity=0.05 * sensitivity,
rot_sensitivity=0.05 * sensitivity,
)
)
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(
Se3SpaceMouseCfg(
pos_sensitivity=0.05 * sensitivity,
rot_sensitivity=0.05 * sensitivity,
)
)
elif args_cli.teleop_device.lower() == "gamepad":
teleop_interface = Se3Gamepad(
Se3GamepadCfg(
pos_sensitivity=0.1 * sensitivity,
rot_sensitivity=0.1 * sensitivity,
)
)
else:
logger.error(f"Unsupported teleop device: {args_cli.teleop_device}")
logger.error("Configure the teleop device in the environment config.")
env.close()
simulation_app.close()
return
# Add callbacks to fallback device
for key, callback in teleoperation_callbacks.items():
try:
teleop_interface.add_callback(key, callback)
except (ValueError, TypeError) as e:
logger.warning(f"Failed to add callback for key {key}: {e}")
except Exception as e:
logger.error(f"Failed to create teleop device: {e}")
env.close()
simulation_app.close()
return
if teleop_interface is None:
logger.error("Failed to create teleop interface")
env.close()
simulation_app.close()
return
print(f"Using teleop device: {teleop_interface}")
# reset environment
env.reset()
teleop_interface.reset()
# -- Diagnostic: print all Camera prims in the stage --
try:
import omni.usd
from pxr import UsdGeom
stage = omni.usd.get_context().get_stage()
print("\n[DIAG] All Camera prims in stage:")
for prim in stage.Traverse():
if UsdGeom.Camera(prim):
print(f" {prim.GetPath()}")
print("[DIAG] Done.\n")
except Exception as _diag_e:
print(f"[DIAG] Could not traverse stage: {_diag_e}")
# -- End diagnostic --
# Create viewport windows bound to robot cameras (no --enable_cameras needed)
create_robot_viewports()
print("Teleoperation started. Press 'R' to reset the environment.")
# simulate environment
while simulation_app.is_running():
try:
# run everything in inference mode
with torch.inference_mode():
# get device command
action = teleop_interface.advance()
# Only apply teleop commands when active
if teleoperation_active:
# process actions
actions = action.repeat(env.num_envs, 1)
# apply actions
env.step(actions)
else:
env.sim.render()
if should_reset_recording_instance:
env.reset()
teleop_interface.reset()
rebind_robot_viewports()
should_reset_recording_instance = False
print("Environment reset complete")
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break
# close the simulator
env.close()
print("Environment closed")
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()