Files
mindbot/scripts/environments/teleoperation/teleop_xr_agent.py
yt lee 623e05f250 Add CloudXR VR streaming support for PICO 4 Ultra (Early Access)
Replaces manual H264/TCP stereo streaming with NVIDIA CloudXR for
higher-quality stereoscopic rendering and lower latency.

Changes:
- teleop_xr_agent.py: add --cloudxr flag (enables Isaac Sim XR mode,
  disables manual StreamingManager)
- deps/cloudxr/: NVIDIA CloudXR.js SDK (Early Access) with Isaac Lab
  teleop React web client
- deps/cloudxr/Dockerfile.wss.proxy: HAProxy WSS proxy for PICO 4 Ultra
  HTTPS mode (routes wss://48322 → ws://49100)
- deps/cloudxr/isaac/webpack.dev.js: disable file watching to avoid
  EMFILE errors with large node_modules
- deps/cloudxr/INSTALL.md: full setup guide

Usage:
  # Start CloudXR Runtime + Isaac Lab
  cd ~/IsaacLab && ./docker/container.py start \
      --files docker-compose.cloudxr-runtime.patch.yaml \
      --env-file .env.cloudxr-runtime

  # Run teleop with CloudXR
  ~/IsaacLab/isaaclab.sh -p teleop_xr_agent.py \
      --task Isaac-MindRobot-2i-DualArm-IK-Abs-v0 --cloudxr

  # Serve web client
  cd deps/cloudxr/isaac && npm run dev-server:https
2026-03-26 14:29:03 +08:00

132 lines
5.3 KiB
Python

#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# SPDX-License-Identifier: BSD-3-Clause
"""XR teleoperation entry point for MindBot.
Selects the appropriate agent class based on the task's action manager:
- DualArmHeadXrAgent (23D) for tasks with head_action (e.g. Isaac-MindRobot-2i-DualArm-IK-Abs-v0)
- DualArmXrAgent (20D) for dual-arm tasks without head
- SingleArmXrAgent (8D) for single-arm tasks (e.g. Isaac-MindRobot-LeftArm-IK-Abs-v0)
Usage:
~/IsaacLab/isaaclab.sh -p scripts/environments/teleoperation/teleop_xr_agent.py \\
--task Isaac-MindRobot-2i-DualArm-IK-Abs-v0 --stream-to <PICO_IP>
"""
import argparse
import logging
import sys
import os
# Ensure xr_utils (next to this script) is importable
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from isaaclab.app import AppLauncher
logger = logging.getLogger(__name__)
# -- CLI arguments --
parser = argparse.ArgumentParser(
description="Teleoperation for Isaac Lab environments with PICO XR Controller (absolute IK mode)."
)
parser.add_argument("--num_envs", type=int, default=1)
parser.add_argument("--task", type=str, default="Isaac-MindRobot-LeftArm-IK-Abs-v0")
parser.add_argument("--sensitivity", type=float, default=None)
parser.add_argument("--pos_sensitivity", type=float, default=None)
parser.add_argument("--rot_sensitivity", type=float, default=None)
parser.add_argument("--arm", type=str, default="left", choices=["left", "right"])
parser.add_argument("--base_speed", type=float, default=5.0)
parser.add_argument("--base_turn", type=float, default=2.0)
parser.add_argument("--drive_speed", type=float, default=0.5)
parser.add_argument("--drive_turn", type=float, default=1.5)
parser.add_argument("--stream-to", type=str, default=None, dest="stream_to")
parser.add_argument("--stream-port", type=int, default=12345, dest="stream_port")
parser.add_argument("--stream-bitrate", type=int, default=20_000_000, dest="stream_bitrate")
parser.add_argument("--debug-viewports", action="store_true", dest="debug_viewports", default=True)
parser.add_argument(
"--cloudxr", action="store_true", dest="cloudxr", default=False,
help="Enable CloudXR streaming mode (replaces --stream-to). "
"Requires CloudXR Runtime Docker container running on this workstation.",
)
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
app_launcher_args = vars(args_cli)
# Enable Isaac Sim XR extensions when using CloudXR; disable for normal streaming
app_launcher_args["xr"] = args_cli.cloudxr
app_launcher_args["enable_cameras"] = not args_cli.cloudxr # CloudXR renders its own stereo views
app_launcher = AppLauncher(app_launcher_args)
simulation_app = app_launcher.app
# -- Post-launch imports (require Isaac Sim runtime) --
import gymnasium as gym
from isaaclab.envs import ManagerBasedRLEnvCfg
import isaaclab_tasks # noqa: F401
import mindbot.tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
from xr_teleop import SingleArmXrAgent, DualArmXrAgent, DualArmHeadXrAgent
def main() -> None:
args = args_cli
# Resolve sensitivity
pos_sens = args.pos_sensitivity or args.sensitivity or 1.0
rot_sens = args.rot_sensitivity or args.sensitivity or 0.3
# Create environment
env_cfg = parse_env_cfg(args.task, num_envs=args.num_envs)
env_cfg.env_name = args.task
if not isinstance(env_cfg, ManagerBasedRLEnvCfg):
raise ValueError(f"Teleoperation requires ManagerBasedRLEnvCfg. Got: {type(env_cfg)}")
if "Abs" not in args.task:
raise ValueError(f"Task '{args.task}' is not an absolute IK task.")
env_cfg.terminations.time_out = None
env = gym.make(args.task, cfg=env_cfg).unwrapped
# Select agent based on task capabilities
action_terms = env.action_manager._terms
is_dual_arm = "left_arm_action" in action_terms
has_head = "head_action" in action_terms
if is_dual_arm and has_head:
print(f"[INFO] DualArmHeadXrAgent (23D) | pos_sens={pos_sens} rot_sens={rot_sens}")
agent = DualArmHeadXrAgent(
env, simulation_app,
pos_sensitivity=pos_sens, rot_sensitivity=rot_sens,
base_speed=args.base_speed, base_turn=args.base_turn,
drive_speed=args.drive_speed, drive_turn=args.drive_turn,
stream_to=None if args.cloudxr else args.stream_to,
stream_port=args.stream_port,
stream_bitrate=args.stream_bitrate,
debug_viewports=args.debug_viewports or bool(args.stream_to),
)
elif is_dual_arm:
print(f"[INFO] DualArmXrAgent (20D) | pos_sens={pos_sens} rot_sens={rot_sens}")
agent = DualArmXrAgent(
env, simulation_app,
pos_sensitivity=pos_sens, rot_sensitivity=rot_sens,
base_speed=args.base_speed, base_turn=args.base_turn,
drive_speed=args.drive_speed, drive_turn=args.drive_turn,
debug_viewports=args.debug_viewports,
)
else:
print(f"[INFO] SingleArmXrAgent (8D) | arm={args.arm} pos_sens={pos_sens} rot_sens={rot_sens}")
agent = SingleArmXrAgent(
env, simulation_app,
arm=args.arm,
pos_sensitivity=pos_sens, rot_sensitivity=rot_sens,
debug_viewports=args.debug_viewports,
)
agent.run()
if __name__ == "__main__":
main()
simulation_app.close()