3 Commits

Author SHA1 Message Date
eef7ff838d Fix PrismaticJoint (trunk) control: IdealPDActuator with high stiffness
Replace ImplicitActuator (which doesn't work for prismatic joints) with
IdealPDActuator. This bypasses PhysX drive and explicitly computes PD forces
using set_dof_actuation_forces().

Changes:
- stiffness=100k, damping=5k to resist arm reaction forces (~150N) with <2mm deflection
- Read trunk_cmd from ArticulationCfg initial position (single source of truth)
- Integrate trunk diagnostics into robot state output
- Remove hardcoded trunk_target parameter
2026-03-25 16:32:52 +08:00
bfe28b188a Fix XR teleop: body-frame IK control for mobile chassis
Switch arm IK from world-frame to body-frame control so that
pushing the XR controller forward always moves the arm in the
robot's forward direction, regardless of chassis rotation.

Key changes:
- dual_arm_agent: convert EEF observations to body frame before
  passing to XR controller; send body-frame IK targets directly
  (removed convert_action_world_to_root)
- xr_controller: XR deltas treated as body-frame deltas (no yaw
  rotation needed — VR view tracks robot heading naturally)
- streaming: add debug frame save for stereo alignment diagnostics
- mindrobot_2i_cfg: IdealPDActuator for trunk, disabled gravity
- Author headers updated to Yutang Li, SIAT

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 11:32:28 +08:00
05a146bca6 重构xr teleop 2026-03-23 22:06:13 +08:00
22 changed files with 2411 additions and 763 deletions

105
CLAUDE.md
View File

@@ -1,15 +1,16 @@
# MindBot Project
基于 Isaac Lab 的移动操作机器人MindRobot仿真学习框架支持强化学习RL模仿学习IL
基于 Isaac Lab 的移动操作机器人MindRobot仿真学习框架支持强化学习RL模仿学习IL和 XR 遥操作
## 环境要求
- **Isaac Sim**: 5.1.0
- **Python**: 3.11
- **GPU**: NVIDIA RTX PRO 6000 96GB开发机推荐开启 DLSS
- **依赖**: isaaclab, isaaclab_assets, isaaclab_mimic, isaaclab_rl, isaaclab_tasks
- **Conda 环境**
- `env_isaaclab` — 主开发环境
- `xr`另一个同样包含 isaaclab/isaacsim 的环境,两者均可运行 Isaac Lab 脚本
- `xr`XR 遥操作 + 仿真推流环境,包含 isaaclab/isaacsim + PyAV
**IMPORTANT**: 所有 Isaac Lab 脚本必须通过 `~/IsaacLab/isaaclab.sh -p` 启动,直接用 `python` 会报 `KeyError: 'EXP_PATH'`(该变量只有 `isaaclab.sh` 会在启动时设置)。
@@ -43,8 +44,19 @@ python scripts/rsl_rl/play.py --task Template-Mindbot-v0
# 遥操作Spacemouse / 键盘)
python scripts/environments/teleoperation/teleop_se3_agent.py --task Isaac-MindRobot-LeftArm-IK-Rel-v0
# 遥操作XR 设备
python scripts/environments/teleoperation/teleop_xr_agent.py --task Isaac-MindRobot-LeftArm-IK-Rel-v0
# 遥操作XR 双臂 + 头部控制
python scripts/environments/teleoperation/teleop_xr_agent.py --task Isaac-MindRobot-2i-DualArm-IK-Abs-v0
# 遥操作XR 双臂,无头部)
python scripts/environments/teleoperation/teleop_xr_agent.py --task Isaac-MindRobot-DualArm-IK-Abs-v0
# 遥操作 + 仿真画面推送到 PICO VR需要 --enable_cameras
python scripts/environments/teleoperation/teleop_xr_agent.py \
--task Isaac-MindRobot-2i-DualArm-IK-Abs-v0 \
--stream-to <PICO_IP> --enable_cameras
# 推流测试(不需要 Isaac Lab用 xr 环境直接运行)
conda run -n xr python scripts/test_sim_streamer.py --host <PICO_IP>
# 代码格式化
pre-commit run --all-files
@@ -55,7 +67,9 @@ pre-commit run --all-files
```
source/mindbot/mindbot/
├── robot/mindbot.py # MindRobot 关节/执行器配置MINDBOT_CFG
├── utils/assets.py # USD 资源路径MINDBOT_ASSETS_DIR
├── utils/
│ ├── assets.py # USD 资源路径MINDBOT_ASSETS_DIR
│ └── sim_video_streamer.py # 仿真→VR H264推流模块PyAV编码 + TCP
└── tasks/manager_based/
├── rl/ # 强化学习任务
│ ├── mindbot/ # Template-Mindbot-v0抓取
@@ -64,19 +78,75 @@ source/mindbot/mindbot/
│ └── pullUltrasoundLidUp/ # Pull-Ultrasound-Lid-Up-v0
└── il/ # 模仿学习任务
├── demo/ # Demo 任务H1、Franka
└── open_drybox/ # Isaac-MindRobot-LeftArm-IK-{Rel,Abs}-v0
└── open_drybox/ # 双臂遥操作任务
scripts/environments/teleoperation/
├── teleop_xr_agent.py # PICO XR 遥操作(双臂/单臂,支持仿真推流)
├── teleop_se3_agent.py # Spacemouse/键盘 遥操作
└── test_sim_streamer.py # 推流独立测试(红蓝测试图案)
deps/
├── XRoboToolkit-Teleop-Sample-Python/ # XR SDK Python 封装
├── XRoboToolkit-RobotVision-PC/ # 物理ZED相机采集+H264推流C++
└── zed-isaac-sim/ # ZED 仿真扩展Isaac Sim OGN
```
## 注册任务列表
| 任务 ID | 类型 | 说明 |
|---|---|---|
| `Template-Mindbot-v0` | RL | MindRobot 抓取 |
| `Template-centrifuge-lidup-v0` | RL | 离心机旋转 |
| `Template-Pull-v0` | RL | 物体拉取 |
| `Pull-Ultrasound-Lid-Up-v0` | RL | 超声机盖拉取 |
| `Isaac-MindRobot-LeftArm-IK-Rel-v0` | IL | 左臂 IK 相对控制 |
| `Isaac-MindRobot-LeftArm-IK-Abs-v0` | IL | 左臂 IK 绝对控制 |
| 任务 ID | 类型 | Action维度 | 说明 |
|---|---|---|---|
| `Template-Mindbot-v0` | RL | - | MindRobot 抓取 |
| `Template-centrifuge-lidup-v0` | RL | - | 离心机旋转 |
| `Template-Pull-v0` | RL | - | 物体拉取 |
| `Pull-Ultrasound-Lid-Up-v0` | RL | - | 超声机盖拉取 |
| `Isaac-MindRobot-LeftArm-IK-Rel-v0` | IL | 12D | 左臂 IK 相对控制 |
| `Isaac-MindRobot-LeftArm-IK-Abs-v0` | IL | 12D | 左臂 IK 绝对控制 |
| `Isaac-MindRobot-DualArm-IK-Abs-v0` | IL | 20D | 双臂 IK无头部 |
| `Isaac-MindRobot-2i-DualArm-IK-Abs-v0` | IL | 22D | 双臂 IK + 头部带VR推流相机 |
## XR 遥操作
### teleop_xr_agent.py 功能
- **双臂绝对 IK 控制**:左右 PICO 手柄分别控制左右机械臂
- **Grip 按钮**:按住移动手臂,松开锁定关节(缓存 root-frame IK 命令)
- **左摇杆**:底盘前后/转向(直接速度控制,绕过摩擦限制)
- **左手 Y 键**:重置环境(右手 B 键不触发重置)
- **头部追踪**HMD 偏航/俯仰映射到机器人头部关节(仅 2i 任务)
- **`--stream-to`**:将仿真立体相机画面实时推送到 PICO VR 显示
### 仿真→VR 推流架构
```
Isaac Lab env.step()
→ vr_left_eye / vr_right_eye (CameraCfg, 1280x720)
→ .cpu().numpy() (GPU→CPU)
→ SimVideoStreamer (后台线程: np.concatenate SBS → PyAV H264 → TCP)
→ PICO Unity Client (TCP Server :12345, H264解码 + VR双眼显示)
```
- TCP 协议4字节大端长度头 + H264 NALU与 RobotVisionConsole 相同)
- Unity Client 零修改,选择 SIM-Stereo 或 ZEDMINI 配置均可
- PICO 端 `video_source.yml` 通过 adb push 热更新:
```bash
adb push Assets/StreamingAssets/video_source.yml \
/sdcard/Android/data/com.xrobotoolkit.client/files/video_source.yml
adb shell am force-stop com.xrobotoolkit.client
```
### 性能优化
- **decimation=4**25Hz 控制频率,平衡 FPS 和响应性)
- 推流时可关闭调试 viewport 节省 GPU`--stream-to` 默认关,`--debug-viewports` 开)
- Isaac Sim 开启 **DLSS + Frame Generation** 提升渲染帧率
- 编码在后台线程异步执行,不阻塞主仿真循环
### ZED 2i 物理相机的 IPD 问题
ZED 2i 基线 120mm >> 人眼 IPD 65mm直接在 VR 中显示会导致所有距离视差过大无法融合3D。
- Shader `stereoOffset` 只是 toe-in 矫正,只对一个固定距离有效
- PC 端深度矫正DIBR理论正确但 artifact 严重
- **根本解决**:在仿真中用 65mm 基线虚拟相机,或使用 ZED Mini63mm ≈ 人眼 IPD
## 新增任务的规范
@@ -119,12 +189,13 @@ tasks/manager_based/il/<task_name>/
| 左夹爪 | left_hand_joint_left/right |
| 右夹爪 | right_hand_joint_left/right |
| 躯干升降 | PrismaticJoint |
| 头部 | head_revoluteJoint |
| 头部 | head_revoluteJoint, head_pitch_Joint |
| 底盘轮子 | front/back revolute joints |
## 遥操作数据收集
- Isaac Lab 内遥操作脚本使用 `env_isaaclab` 环境
- XR 设备接入依赖 `deps/XRoboToolkit-Teleop-Sample-Python/`,需切换到 `xr` 环境
- Isaac Lab 内遥操作脚本使用 `xr` 环境(包含 isaaclab + XR SDK
- XR 设备接入依赖 `deps/XRoboToolkit-Teleop-Sample-Python/`
- 示教数据格式兼容 LeRobot / RoboMimic
- 多相机配置cam_head, cam_chest, cam_left_hand, cam_right_hand, cam_top, cam_side
- VR 立体相机vr_left_eye, vr_right_eyeZED_X CameraLeft/CameraRight

1
deps/XRoboToolkit-Unity-Client vendored Submodule

Submodule deps/XRoboToolkit-Unity-Client added at c9326092ff

View File

@@ -1,53 +1,49 @@
#!/usr/bin/env python3
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to run teleoperation with Isaac Lab manipulation environments using PICO XR Controllers.
"""XR teleoperation entry point for MindBot.
Only absolute IK mode is supported (Isaac-MindRobot-LeftArm-IK-Abs-v0).
The controller computes delta pose in Isaac Sim world frame and accumulates
an absolute EEF target, which is then converted to robot root frame before
being sent to the DifferentialIK action manager.
Selects the appropriate agent class based on the task's action manager:
- DualArmHeadXrAgent (23D) for tasks with head_action (e.g. Isaac-MindRobot-2i-DualArm-IK-Abs-v0)
- DualArmXrAgent (20D) for dual-arm tasks without head
- SingleArmXrAgent (8D) for single-arm tasks (e.g. Isaac-MindRobot-LeftArm-IK-Abs-v0)
Usage:
~/IsaacLab/isaaclab.sh -p scripts/environments/teleoperation/teleop_xr_agent.py \\
--task Isaac-MindRobot-2i-DualArm-IK-Abs-v0 --stream-to <PICO_IP>
"""
import argparse
import logging
import sys
import os
from collections.abc import Callable
# Ensure xr_utils (next to this script) is importable when running directly
# Ensure xr_utils (next to this script) is importable
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from isaaclab.app import AppLauncher
logger = logging.getLogger(__name__)
# add argparse arguments
# -- CLI arguments --
parser = argparse.ArgumentParser(
description="Teleoperation for Isaac Lab environments with PICO XR Controller (absolute IK mode only)."
description="Teleoperation for Isaac Lab environments with PICO XR Controller (absolute IK mode)."
)
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
parser.add_argument(
"--task",
type=str,
default="Isaac-MindRobot-LeftArm-IK-Abs-v0",
help="Name of the task (must be an Abs IK task).",
)
parser.add_argument(
"--sensitivity", type=float, default=None,
help="Set both pos and rot sensitivity (overridden by --pos_sensitivity/--rot_sensitivity).",
)
parser.add_argument("--pos_sensitivity", type=float, default=None, help="Position sensitivity. Default: 1.0.")
parser.add_argument("--rot_sensitivity", type=float, default=None, help="Rotation sensitivity. Default: 0.3.")
parser.add_argument("--arm", type=str, default="left", choices=["left", "right"], help="Which arm/controller to use.")
parser.add_argument("--base_speed", type=float, default=5.0, help="Max wheel speed (rad/s) for joystick full forward.")
parser.add_argument("--base_turn", type=float, default=2.0, help="Max wheel differential (rad/s) for joystick full left/right.")
parser.add_argument("--drive_speed", type=float, default=0.5, help="Max robot linear speed (m/s) for direct base control. Default: 0.5")
parser.add_argument("--drive_turn", type=float, default=1.5, help="Max robot yaw rate (rad/s) for direct base control. Default: 1.5")
parser.add_argument("--num_envs", type=int, default=1)
parser.add_argument("--task", type=str, default="Isaac-MindRobot-LeftArm-IK-Abs-v0")
parser.add_argument("--sensitivity", type=float, default=None)
parser.add_argument("--pos_sensitivity", type=float, default=None)
parser.add_argument("--rot_sensitivity", type=float, default=None)
parser.add_argument("--arm", type=str, default="left", choices=["left", "right"])
parser.add_argument("--base_speed", type=float, default=5.0)
parser.add_argument("--base_turn", type=float, default=2.0)
parser.add_argument("--drive_speed", type=float, default=0.5)
parser.add_argument("--drive_turn", type=float, default=1.5)
parser.add_argument("--stream-to", type=str, default=None, dest="stream_to")
parser.add_argument("--stream-port", type=int, default=12345, dest="stream_port")
parser.add_argument("--stream-bitrate", type=int, default=20_000_000, dest="stream_bitrate")
parser.add_argument("--debug-viewports", action="store_true", dest="debug_viewports", default=True)
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
@@ -57,726 +53,69 @@ app_launcher_args["xr"] = False
app_launcher = AppLauncher(app_launcher_args)
simulation_app = app_launcher.app
"""Rest everything follows."""
# -- Post-launch imports (require Isaac Sim runtime) --
import gymnasium as gym
import numpy as np
import torch
from scipy.spatial.transform import Rotation as R
from isaaclab.envs import ManagerBasedRLEnvCfg
import isaaclab_tasks # noqa: F401
import mindbot.tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
from xr_utils import XrClient, transform_xr_pose, quat_diff_as_rotvec_xyzw, is_valid_quaternion
from xr_utils.geometry import R_HEADSET_TO_WORLD
from xr_teleop import SingleArmXrAgent, DualArmXrAgent, DualArmHeadXrAgent
# =====================================================================
# Robot Camera Viewport Utilities
# =====================================================================
# Resolved prim paths for env_0 — must match CameraCfg prim_path with
# {ENV_REGEX_NS} → /World/envs/env_0
_ROBOT_CAM_PRIMS: dict[str, str] = {
"Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand",
"Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand",
"Head": "/World/envs/env_0/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft",
"Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest",
}
# Stores created viewport window objects
_robot_viewports: dict[str, object] = {}
def create_robot_viewports() -> None:
"""Create 4 viewport windows and bind each to a robot camera prim.
Must be called after env.reset() so all prims exist on the USD stage.
"""
try:
import omni.kit.viewport.utility as vp_util
except ImportError:
logger.warning("[Viewport] omni.kit.viewport.utility not available — skipping viewport creation.")
return
for name, cam_path in _ROBOT_CAM_PRIMS.items():
vp_win = vp_util.create_viewport_window(
f"Robot {name} View", width=640, height=360
)
vp_win.viewport_api.camera_path = cam_path
_robot_viewports[name] = vp_win
print(f"[INFO] Viewport 'Robot {name} View' bound to: {cam_path}")
def rebind_robot_viewports() -> None:
"""Re-bind all robot viewports to their camera paths.
Call this after every env reset so viewports stay locked to the cameras.
"""
for name, vp_win in _robot_viewports.items():
cam_path = _ROBOT_CAM_PRIMS[name]
vp_win.viewport_api.camera_path = cam_path
# =====================================================================
# Helpers
# =====================================================================
def _quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R:
"""Convert Isaac-style wxyz quaternion to scipy Rotation."""
return R.from_quat([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]])
def _rotation_to_quat_wxyz(rot: R) -> np.ndarray:
"""Convert scipy Rotation to Isaac-style wxyz quaternion."""
q = rot.as_quat()
return np.array([q[3], q[0], q[1], q[2]])
def world_pose_to_root_frame(
pos_w: np.ndarray,
quat_wxyz: np.ndarray,
root_pos_w: np.ndarray,
root_quat_wxyz: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Express a world-frame pose in the robot root frame."""
root_rot = _quat_wxyz_to_rotation(root_quat_wxyz)
pose_rot = _quat_wxyz_to_rotation(quat_wxyz)
pos_root = root_rot.inv().apply(pos_w - root_pos_w)
quat_root = _rotation_to_quat_wxyz(root_rot.inv() * pose_rot)
return pos_root, quat_root
# =====================================================================
# Teleoperation Controller
# =====================================================================
class XrTeleopController:
"""Teleop controller for PICO XR headset (absolute IK mode).
Accumulates an absolute EEF target in Isaac Sim world frame.
Grip button acts as clutch; B/Y buttons trigger environment reset.
"""
def __init__(self, arm="left", pos_sensitivity=1.0, rot_sensitivity=0.3,
max_pos_per_step=0.05, max_rot_per_step=0.08, xr_client=None):
self.xr_client = xr_client if xr_client is not None else XrClient()
self._owns_client = xr_client is None # only close if we created it
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
# Hard caps per physics step to prevent IK divergence.
# max_rot_per_step ~4.6°, max_pos_per_step 5 cm.
self.max_pos_per_step = max_pos_per_step
self.max_rot_per_step = max_rot_per_step
self.arm = arm
self.controller_name = "left_controller" if arm == "left" else "right_controller"
self.grip_name = "left_grip" if arm == "left" else "right_grip"
self.trigger_name = "left_trigger" if arm == "left" else "right_trigger"
self.R_headset_world = R_HEADSET_TO_WORLD
self.prev_xr_pos = None
self.prev_xr_quat = None
self.target_eef_pos = None # world frame
self.target_eef_quat = None # world frame, wxyz
self.grip_active = False
self.frame_count = 0
self.reset_button_latched = False
self.require_grip_reengage = False
self.grip_engage_threshold = 0.8
self.grip_release_threshold = 0.2
self.callbacks = {}
def add_callback(self, name: str, func: Callable):
self.callbacks[name] = func
def reset(self):
self.prev_xr_pos = None
self.prev_xr_quat = None
self.grip_active = False
self.frame_count = 0
self.target_eef_pos = None
self.target_eef_quat = None
# Require grip release after reset to avoid driving to stale pose.
self.require_grip_reengage = True
def close(self):
if self._owns_client:
self.xr_client.close()
def get_zero_action(self, trigger, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Return 8D hold action [x, y, z, qw, qx, qy, qz, gripper] at frozen target pose."""
action = torch.zeros(8)
if self.target_eef_pos is not None and self.target_eef_quat is not None:
# Prefer frozen world-frame target so IK holds a fixed world position
# even if the robot chassis drifts slightly under gravity.
action[:3] = torch.tensor(self.target_eef_pos.copy())
action[3:7] = torch.tensor(self.target_eef_quat.copy())
elif current_eef_pos is not None and current_eef_quat is not None:
action[:3] = torch.tensor(current_eef_pos.copy())
action[3:7] = torch.tensor(current_eef_quat.copy())
else:
action[3] = 1.0 # identity quaternion
action[7] = 1.0 if trigger > 0.5 else -1.0
return action
def advance(self, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Read XR controller and return 8D absolute action [x, y, z, qw, qx, qy, qz, gripper].
Position and quaternion are in Isaac Sim world frame.
Caller must convert to robot root frame before sending to IK.
"""
try:
reset_pressed = self.xr_client.get_button("B") or self.xr_client.get_button("Y")
if reset_pressed and not self.reset_button_latched:
if "RESET" in self.callbacks:
self.callbacks["RESET"]()
self.reset_button_latched = reset_pressed
except Exception:
pass
try:
raw_pose = self.xr_client.get_pose(self.controller_name)
grip = self.xr_client.get_key_value(self.grip_name)
trigger = self.xr_client.get_key_value(self.trigger_name)
except Exception:
return self.get_zero_action(0.0, current_eef_pos, current_eef_quat)
if not is_valid_quaternion(raw_pose[3:]):
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Wait for grip release after reset
if self.require_grip_reengage:
if grip <= self.grip_release_threshold:
self.require_grip_reengage = False
else:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Grip button as clutch with hysteresis
grip_pressed = grip > self.grip_release_threshold if self.grip_active else grip >= self.grip_engage_threshold
if not grip_pressed:
self.prev_xr_pos = None
self.prev_xr_quat = None
# Only snapshot target on the transition frame (grip just released) or first ever frame.
# After that, keep it frozen so IK holds a fixed world-frame position.
if self.grip_active or self.target_eef_pos is None:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = False
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
if not self.grip_active:
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = True
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Project current and previous XR poses into Isaac Sim world frame
xr_world_pos, xr_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:])
prev_xr_world_pos, prev_xr_world_quat_xyzw = transform_xr_pose(self.prev_xr_pos, self.prev_xr_quat)
# Delta position (world frame), clamped per step
world_delta_pos = (xr_world_pos - prev_xr_world_pos) * self.pos_sensitivity
pos_norm = np.linalg.norm(world_delta_pos)
if pos_norm > self.max_pos_per_step:
world_delta_pos *= self.max_pos_per_step / pos_norm
# Delta rotation (world frame), clamped per step
world_delta_rot = quat_diff_as_rotvec_xyzw(prev_xr_world_quat_xyzw, xr_world_quat_xyzw) * self.rot_sensitivity
rot_norm = np.linalg.norm(world_delta_rot)
if rot_norm > self.max_rot_per_step:
world_delta_rot *= self.max_rot_per_step / rot_norm
gripper_action = 1.0 if trigger > 0.5 else -1.0
# Accumulate absolute target
if self.target_eef_pos is None:
self.target_eef_pos = np.zeros(3)
self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0])
self.target_eef_pos += world_delta_pos
# Apply rotation delta in world frame so controller direction = EEF world direction
target_r = _quat_wxyz_to_rotation(self.target_eef_quat)
self.target_eef_quat = _rotation_to_quat_wxyz(R.from_rotvec(world_delta_rot) * target_r)
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
self.frame_count += 1
action = torch.tensor([
*self.target_eef_pos,
*self.target_eef_quat,
gripper_action,
], dtype=torch.float32)
if self.frame_count % 30 == 0:
np.set_printoptions(precision=4, suppress=True, floatmode='fixed')
print("\n====================== [VR TELEOP DEBUG] ======================")
print(f"| Raw VR Pos (OpenXR): {np.array(raw_pose[:3])}")
print(f"| Raw VR Quat (xyzw): {np.array(raw_pose[3:])}")
print(f"| XR Delta Pos (world): {world_delta_pos} (norm={np.linalg.norm(world_delta_pos):.4f})")
print(f"| XR Delta Rot (world): {world_delta_rot} (norm={np.linalg.norm(world_delta_rot):.4f})")
print(f"| Targ Pos (world): {action[:3].numpy()}")
print(f"| Targ Quat (world, wxyz): {action[3:7].numpy()}")
print(f"| Gripper & Trigger: Grip={grip:.2f}, Trig={trigger:.2f}")
print("===============================================================")
return action
# =====================================================================
# Head Tracking Utility
# =====================================================================
def get_head_joint_targets(
xr_client, neutral_rot: R | None
) -> tuple[np.ndarray, R | None]:
"""Extract head joint targets [yaw, pitch] from HMD pose.
At first call (neutral_rot is None) the current headset orientation
is recorded as the neutral reference and zeros are returned.
Subsequent calls return yaw/pitch relative to that neutral pose.
Returns:
(joint_targets, neutral_rot) where joint_targets is shape (2,)
float32 [head_yaw_Joint_rad, head_pitch_Joint_rad].
"""
try:
raw_pose = xr_client.get_pose("headset")
if not is_valid_quaternion(raw_pose[3:]):
return np.zeros(2, dtype=np.float32), neutral_rot
_, head_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:])
head_rot = R.from_quat(head_world_quat_xyzw)
if neutral_rot is None:
# First call — record neutral and hold zero
return np.zeros(2, dtype=np.float32), head_rot
# Relative rotation from neutral heading
rel_rot = head_rot * neutral_rot.inv()
# ZYX Euler: [0]=yaw (Z), [1]=pitch (Y)
euler_zyx = rel_rot.as_euler("ZYX")
yaw = float(np.clip(euler_zyx[0], -1.57, 1.57)) # ±90°
pitch = float(np.clip(euler_zyx[1], -0.52, 0.52)) # ±30°
return np.array([yaw, pitch], dtype=np.float32), neutral_rot
except Exception:
return np.zeros(2, dtype=np.float32), neutral_rot
# =====================================================================
# Main Execution Loop
# =====================================================================
def main() -> None:
"""Run teleoperation with PICO XR Controller against Isaac Lab environment."""
args = args_cli
env_cfg = parse_env_cfg(args_cli.task, num_envs=args_cli.num_envs)
env_cfg.env_name = args_cli.task
# Resolve sensitivity
pos_sens = args.pos_sensitivity or args.sensitivity or 1.0
rot_sens = args.rot_sensitivity or args.sensitivity or 0.3
# Create environment
env_cfg = parse_env_cfg(args.task, num_envs=args.num_envs)
env_cfg.env_name = args.task
if not isinstance(env_cfg, ManagerBasedRLEnvCfg):
raise ValueError(f"Teleoperation requires ManagerBasedRLEnvCfg. Got: {type(env_cfg)}")
if "Abs" not in args_cli.task:
raise ValueError(
f"Task '{args_cli.task}' is not an absolute IK task. "
"Only Abs tasks are supported (e.g. Isaac-MindRobot-LeftArm-IK-Abs-v0)."
)
if "Abs" not in args.task:
raise ValueError(f"Task '{args.task}' is not an absolute IK task.")
env_cfg.terminations.time_out = None
try:
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
except Exception as e:
logger.error(f"Failed to create environment '{args_cli.task}': {e}")
simulation_app.close()
return
env = gym.make(args.task, cfg=env_cfg).unwrapped
# Detect single-arm vs dual-arm based on action manager term names
is_dual_arm = "left_arm_action" in env.action_manager._terms
# Select agent based on task capabilities
action_terms = env.action_manager._terms
is_dual_arm = "left_arm_action" in action_terms
has_head = "head_action" in action_terms
# Sensitivity: explicit args override --sensitivity, which overrides defaults
pos_sens = 1.0
rot_sens = 0.3
if args_cli.sensitivity is not None:
pos_sens = args_cli.sensitivity
rot_sens = args_cli.sensitivity
if args_cli.pos_sensitivity is not None:
pos_sens = args_cli.pos_sensitivity
if args_cli.rot_sensitivity is not None:
rot_sens = args_cli.rot_sensitivity
if is_dual_arm:
print(f"\n[INFO] Dual-arm mode detected. Using both controllers.")
print(f"[INFO] IK Mode: ABSOLUTE")
print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}")
shared_client = XrClient()
teleop_left = XrTeleopController(arm="left", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client)
teleop_right = XrTeleopController(arm="right", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client)
teleop_interface = teleop_left # primary interface for callbacks/reset
teleop_right_ref = teleop_right
if is_dual_arm and has_head:
print(f"[INFO] DualArmHeadXrAgent (23D) | pos_sens={pos_sens} rot_sens={rot_sens}")
agent = DualArmHeadXrAgent(
env, simulation_app,
pos_sensitivity=pos_sens, rot_sensitivity=rot_sens,
base_speed=args.base_speed, base_turn=args.base_turn,
drive_speed=args.drive_speed, drive_turn=args.drive_turn,
stream_to=args.stream_to, stream_port=args.stream_port,
stream_bitrate=args.stream_bitrate,
debug_viewports=args.debug_viewports or bool(args.stream_to),
)
elif is_dual_arm:
print(f"[INFO] DualArmXrAgent (20D) | pos_sens={pos_sens} rot_sens={rot_sens}")
agent = DualArmXrAgent(
env, simulation_app,
pos_sensitivity=pos_sens, rot_sensitivity=rot_sens,
base_speed=args.base_speed, base_turn=args.base_turn,
drive_speed=args.drive_speed, drive_turn=args.drive_turn,
debug_viewports=args.debug_viewports,
)
else:
print(f"\n[INFO] Connecting to PICO XR Headset using {args_cli.arm} controller...")
print(f"[INFO] IK Mode: ABSOLUTE")
print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}")
teleop_interface = XrTeleopController(
arm=args_cli.arm,
pos_sensitivity=pos_sens,
rot_sensitivity=rot_sens,
print(f"[INFO] SingleArmXrAgent (8D) | arm={args.arm} pos_sens={pos_sens} rot_sens={rot_sens}")
agent = SingleArmXrAgent(
env, simulation_app,
arm=args.arm,
pos_sensitivity=pos_sens, rot_sensitivity=rot_sens,
debug_viewports=args.debug_viewports,
)
should_reset = False
def request_reset():
nonlocal should_reset
should_reset = True
print("[INFO] Reset requested via XR button.")
teleop_interface.add_callback("RESET", request_reset)
def clear_ik_target_state():
"""Sync IK controller's internal target to current EEF pose to avoid jumps after reset."""
if is_dual_arm:
for term_key in ["left_arm_action", "right_arm_action"]:
arm_action_term = env.action_manager._terms[term_key]
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
else:
arm_action_term = env.action_manager._terms["arm_action"]
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
def convert_action_world_to_root(action_tensor: torch.Tensor) -> torch.Tensor:
"""Convert absolute IK action from world frame to robot root frame."""
robot = env.unwrapped.scene["robot"]
root_pos_w = robot.data.root_pos_w[0].detach().cpu().numpy()
root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy()
pos_root, quat_root = world_pose_to_root_frame(
action_tensor[:3].numpy(), action_tensor[3:7].numpy(),
root_pos_w, root_quat_w,
)
out = action_tensor.clone()
out[:3] = torch.tensor(pos_root, dtype=torch.float32)
out[3:7] = torch.tensor(quat_root, dtype=torch.float32)
return out
def get_chassis_commands() -> tuple[torch.Tensor, float, float]:
"""Read left joystick → (wheel_cmd 4D, v_fwd m/s, omega rad/s).
wheel_cmd : differential wheel velocities for the action vector (training data).
v_fwd : desired robot forward speed in m/s (for direct base control).
omega : desired robot yaw rate in rad/s (for direct base control).
"""
try:
joy = teleop_interface.xr_client.get_joystick("left")
jy = float(joy[1])
jx = float(joy[0])
except Exception:
return torch.zeros(4), 0.0, 0.0
# Wheel velocity commands for action vector (rad/s)
v_w = jy * args_cli.base_speed
omega_w = jx * args_cli.base_turn
right_vel = v_w - omega_w
left_vel = v_w + omega_w
wheel_cmd = torch.tensor([right_vel, left_vel, left_vel, right_vel], dtype=torch.float32)
if abs(v_w) > 0.1 or abs(omega_w) > 0.1:
print(f"[WHEEL] joy=({jx:+.2f},{jy:+.2f}) v_wheel={v_w:.2f} ω_wheel={omega_w:.2f}{wheel_cmd.tolist()}")
# Direct base velocity commands (m/s, rad/s)
v_fwd = jy * args_cli.drive_speed
omega = -jx * args_cli.drive_turn # jx<0 (left push) → positive yaw = left turn
return wheel_cmd, v_fwd, omega
def apply_base_velocity(v_fwd: float, omega: float) -> None:
"""Directly set robot root linear+angular velocity to bypass isotropic friction.
PhysX uses isotropic friction, so skid-steer lateral slip is impossible through
wheel torques alone. This function overrides the base velocity each step to give
clean translational + rotational motion regardless of friction.
The write is buffered and applied at the start of the next env.step().
"""
if abs(v_fwd) < 1e-4 and abs(omega) < 1e-4:
return
robot = env.unwrapped.scene["robot"]
rq = robot.data.root_quat_w # [N, 4] wxyz
w_q, x_q, y_q, z_q = rq[:, 0], rq[:, 1], rq[:, 2], rq[:, 3]
# Robot forward direction in world frame (rotate [1,0,0] by root quaternion)
fwd_x = 1.0 - 2.0 * (y_q * y_q + z_q * z_q)
fwd_y = 2.0 * (x_q * y_q + w_q * z_q)
vel = torch.zeros(env.num_envs, 6, device=device)
vel[:, 0] = v_fwd * fwd_x # world x linear
vel[:, 1] = v_fwd * fwd_y # world y linear
# vel[:, 2] = 0 # z: let physics handle gravity/contact
vel[:, 5] = omega # world z angular (yaw)
robot.write_root_velocity_to_sim(vel)
obs, _ = env.reset()
clear_ik_target_state()
teleop_interface.reset()
# Read contact sensor data directly from OmniGraph node USD attributes.
# The USD has action graphs that execute each physics step and write outputs to:
# outputs:inContact (bool) outputs:value (float, Newtons)
# /root in USD → /World/envs/env_0/Robot in the loaded scene.
_CONTACT_OG_NODES = {
"left_r": (
"/World/envs/env_0/Robot"
"/l_zhuajia___m2_1_cs/isaac_read_contact_sensor_node"
),
"left_l": (
"/World/envs/env_0/Robot"
"/l_zhuajia___m3_1_cs/isaac_read_contact_sensor_node"
),
}
def _og_contact_reading(stage, node_path: str) -> tuple[bool, float]:
"""Read one OmniGraph contact node output via USD attribute API."""
prim = stage.GetPrimAtPath(node_path)
if not prim.IsValid():
return False, float("nan")
in_contact_attr = prim.GetAttribute("outputs:inContact")
force_attr = prim.GetAttribute("outputs:value")
in_contact = in_contact_attr.Get() if in_contact_attr.IsValid() else False
force = force_attr.Get() if force_attr.IsValid() else 0.0
return bool(in_contact), float(force or 0.0)
# One-time diagnostic: run after first env.reset() so all prims exist
def _diag_contact_nodes() -> None:
import omni.usd
stage = omni.usd.get_context().get_stage()
print("\n[DIAG] OmniGraph contact node check:")
for name, path in _CONTACT_OG_NODES.items():
prim = stage.GetPrimAtPath(path)
if prim.IsValid():
has_in = prim.GetAttribute("outputs:inContact").IsValid()
has_val = prim.GetAttribute("outputs:value").IsValid()
print(f" [{name}] ✓ prim valid | outputs:inContact={has_in} outputs:value={has_val}")
else:
# Try to find similar prims to hint correct path
parent_path = "/".join(path.split("/")[:-1])
parent = stage.GetPrimAtPath(parent_path)
print(f" [{name}] ✗ NOT FOUND at: {path}")
if parent.IsValid():
children = [c.GetName() for c in parent.GetChildren()]
print(f" parent exists, children: {children}")
else:
gp_path = "/".join(path.split("/")[:-2])
gp = stage.GetPrimAtPath(gp_path)
print(f" grandparent '{gp_path}' valid={gp.IsValid()}")
def read_gripper_contact() -> str:
"""Return contact status string by reading OmniGraph node USD attributes."""
try:
import omni.usd
stage = omni.usd.get_context().get_stage()
c_r, f_r = _og_contact_reading(stage, _CONTACT_OG_NODES["left_r"])
c_l, f_l = _og_contact_reading(stage, _CONTACT_OG_NODES["left_l"])
if f_r != f_r: # nan → prim not found
return "prim not found (run with diag)"
sym_r = "" if c_r else ""
sym_l = "" if c_l else ""
return f"{sym_r}R={f_r:.1f}N {sym_l}L={f_l:.1f}N"
except Exception as _e:
return f"N/A ({_e})"
# One-time diagnostic: verify OmniGraph contact sensor node paths
_diag_contact_nodes()
# Create 4 viewport windows bound to the robot cameras
create_robot_viewports()
if is_dual_arm:
teleop_right_ref.reset()
print("\n" + "=" * 50)
print(" Teleoperation Started!")
if is_dual_arm:
print(" LEFT controller → left arm")
print(" RIGHT controller → right arm")
print(" TRIGGER: open/close gripper")
print(" GRIP (hold): move the arm")
print(" Left joystick: Y=forward/back, X=turn")
print(" B / Y: reset environment")
print("=" * 50 + "\n")
device = env.unwrapped.device
sim_frame = 0
obs, _ = env.reset()
clear_ik_target_state()
last_root_left = None
last_root_right = None
neutral_head_rot = None # calibrated on first headset sample after reset
while simulation_app.is_running():
try:
with torch.inference_mode():
if should_reset:
obs, _ = env.reset()
clear_ik_target_state()
teleop_interface.reset()
if is_dual_arm:
teleop_right_ref.reset()
rebind_robot_viewports()
should_reset = False
sim_frame = 0
last_root_left = None
last_root_right = None
neutral_head_rot = None # re-calibrate on next headset sample
continue
policy_obs = obs["policy"]
wheel_np, v_fwd, omega_base = get_chassis_commands()
# Head tracking: HMD yaw/pitch relative to neutral pose
head_cmd_np, neutral_head_rot = get_head_joint_targets(
teleop_interface.xr_client, neutral_head_rot
)
head_cmd = torch.tensor(head_cmd_np, dtype=torch.float32)
if is_dual_arm:
eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy()
eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy()
eef_quat_right = policy_obs["eef_quat_right"][0].cpu().numpy()
left_action = teleop_left.advance(current_eef_pos=eef_pos_left, current_eef_quat=eef_quat_left)
right_action = teleop_right_ref.advance(current_eef_pos=eef_pos_right, current_eef_quat=eef_quat_right)
# Only recompute root-frame IK target when grip is active; otherwise freeze joints
if teleop_left.grip_active or last_root_left is None:
last_root_left = convert_action_world_to_root(left_action)[:7].clone()
if teleop_right_ref.grip_active or last_root_right is None:
last_root_right = convert_action_world_to_root(right_action)[:7].clone()
# Action layout: left_arm(7) | wheel(4) | left_gripper(1) | right_arm(7) | right_gripper(1) | head(2)
action_np = torch.cat([
last_root_left, wheel_np, left_action[7:8],
last_root_right, right_action[7:8],
head_cmd,
])
else:
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
action_np = teleop_interface.advance(current_eef_pos=eef_pos, current_eef_quat=eef_quat)
if teleop_interface.grip_active or last_root_left is None:
last_root_left = convert_action_world_to_root(action_np)[:7].clone()
# Action layout: arm(7) | wheel(4) | gripper(1)
action_np = torch.cat([last_root_left, wheel_np, action_np[7:8]])
actions = action_np.unsqueeze(0).repeat(env.num_envs, 1).to(device)
obs, _, _, _, _ = env.step(actions)
# Direct base velocity override: bypasses isotropic friction limitation
# so skid-steer turning works correctly.
apply_base_velocity(v_fwd, omega_base)
sim_frame += 1
if sim_frame % 30 == 0:
np.set_printoptions(precision=4, suppress=True, floatmode='fixed')
policy_obs = obs["policy"]
joint_pos = policy_obs["joint_pos"][0].cpu().numpy()
last_act = policy_obs["actions"][0].cpu().numpy()
if sim_frame == 30:
robot = env.unwrapped.scene["robot"]
jnames = robot.joint_names
print(f"\n{'='*70}")
print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS")
print(f"{'='*70}")
for i, name in enumerate(jnames):
print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}")
arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
print(f" Deduced left arm indices: {arm_idx}")
print(f"{'='*70}\n")
if not hasattr(env, '_arm_idx_cache'):
robot = env.unwrapped.scene["robot"]
jnames = robot.joint_names
env._arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
env._right_arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("r_joint")]
arm_joints = joint_pos[env._arm_idx_cache]
right_arm_joints = joint_pos[env._right_arm_idx_cache]
try:
joy_dbg = teleop_interface.xr_client.get_joystick("left")
joy_str = f"[{joy_dbg[0]:+.2f}, {joy_dbg[1]:+.2f}]"
except Exception:
joy_str = "N/A"
if is_dual_arm:
eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy()
eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy()
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| Right Arm Joints (rad): {right_arm_joints}")
print(f"| Left EEF Pos (world, m): {eef_pos_left}")
print(f"| Right EEF Pos (world, m): {eef_pos_right}")
print(f"| Left Gripper Contact: {read_gripper_contact()}")
print(f"| Cmd left_arm(abs): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd left_gripper: {last_act[11:12]} (+1=open -1=close)")
print(f"| Cmd right_arm(abs): {last_act[12:19]}")
print(f"| Cmd right_gripper: {last_act[19:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
else:
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
if not hasattr(env, '_prev_eef_quat'):
env._prev_eef_quat = eef_quat.copy()
prev_q = env._prev_eef_quat
r_prev = R.from_quat([prev_q[1], prev_q[2], prev_q[3], prev_q[0]])
r_curr = R.from_quat([eef_quat[1], eef_quat[2], eef_quat[3], eef_quat[0]])
delta_rotvec = (r_curr * r_prev.inv()).as_rotvec()
delta_angle_deg = np.degrees(np.linalg.norm(delta_rotvec))
delta_axis = delta_rotvec / (np.linalg.norm(delta_rotvec) + 1e-9)
env._prev_eef_quat = eef_quat.copy()
approach_vec = r_curr.apply(np.array([1.0, 0.0, 0.0]))
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| EEF Pos (world, m): {eef_pos}")
print(f"| EEF approach vec (world): [{approach_vec[0]:+.3f}, {approach_vec[1]:+.3f}, {approach_vec[2]:+.3f}] (夹爪朝向)")
print(f"| EEF rot since last print: {delta_angle_deg:+.2f}° around [{delta_axis[0]:+.3f},{delta_axis[1]:+.3f},{delta_axis[2]:+.3f}] (world)")
print(f"| Cmd arm(abs pos+quat): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd gripper: {last_act[11:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break
teleop_interface.close()
if is_dual_arm:
teleop_right_ref.close()
shared_client.close()
env.close()
agent.run()
if __name__ == "__main__":

View File

@@ -0,0 +1,856 @@
#!/usr/bin/env python3
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to run teleoperation with Isaac Lab manipulation environments using PICO XR Controllers.
Only absolute IK mode is supported (Isaac-MindRobot-LeftArm-IK-Abs-v0).
The controller computes delta pose in Isaac Sim world frame and accumulates
an absolute EEF target, which is then converted to robot root frame before
being sent to the DifferentialIK action manager.
"""
import argparse
import logging
import sys
import os
from collections.abc import Callable
# Ensure xr_utils (next to this script) is importable when running directly
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from isaaclab.app import AppLauncher
logger = logging.getLogger(__name__)
# add argparse arguments
parser = argparse.ArgumentParser(
description="Teleoperation for Isaac Lab environments with PICO XR Controller (absolute IK mode only)."
)
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
parser.add_argument(
"--task",
type=str,
default="Isaac-MindRobot-LeftArm-IK-Abs-v0",
help="Name of the task (must be an Abs IK task).",
)
parser.add_argument(
"--sensitivity", type=float, default=None,
help="Set both pos and rot sensitivity (overridden by --pos_sensitivity/--rot_sensitivity).",
)
parser.add_argument("--pos_sensitivity", type=float, default=None, help="Position sensitivity. Default: 1.0.")
parser.add_argument("--rot_sensitivity", type=float, default=None, help="Rotation sensitivity. Default: 0.3.")
parser.add_argument("--arm", type=str, default="left", choices=["left", "right"], help="Which arm/controller to use.")
parser.add_argument("--base_speed", type=float, default=5.0, help="Max wheel speed (rad/s) for joystick full forward.")
parser.add_argument("--base_turn", type=float, default=2.0, help="Max wheel differential (rad/s) for joystick full left/right.")
parser.add_argument("--drive_speed", type=float, default=0.5, help="Max robot linear speed (m/s) for direct base control. Default: 0.5")
parser.add_argument("--drive_turn", type=float, default=1.5, help="Max robot yaw rate (rad/s) for direct base control. Default: 1.5")
parser.add_argument(
"--stream-to", type=str, default=None, dest="stream_to",
help="PICO VR headset IP for sim stereo streaming. Streams H264 SBS to the Unity client TCP server. "
"Requires vr_left_eye/vr_right_eye cameras in the env scene config.",
)
parser.add_argument("--stream-port", type=int, default=12345, dest="stream_port", help="TCP port for video streaming.")
parser.add_argument("--stream-bitrate", type=int, default=20_000_000, dest="stream_bitrate", help="H264 bitrate (bps).")
parser.add_argument(
"--debug-viewports", action="store_true", dest="debug_viewports", default=False,
help="Enable debug viewport windows (Left Hand, Right Hand, Head, Chest). Off by default when --stream-to is used.",
)
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
app_launcher_args = vars(args_cli)
app_launcher_args["xr"] = False
app_launcher = AppLauncher(app_launcher_args)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
import numpy as np
import torch
from scipy.spatial.transform import Rotation as R
from isaaclab.envs import ManagerBasedRLEnvCfg
import isaaclab_tasks # noqa: F401
import mindbot.tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
from xr_utils import XrClient, transform_xr_pose, quat_diff_as_rotvec_xyzw, is_valid_quaternion
from xr_utils.geometry import R_HEADSET_TO_WORLD
# =====================================================================
# Robot Camera Viewport Utilities
# =====================================================================
# Resolved prim paths for env_0 — must match CameraCfg prim_path with
# {ENV_REGEX_NS} → /World/envs/env_0
_ROBOT_CAM_PRIMS: dict[str, str] = {
"Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand",
"Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand",
"Head": "/World/envs/env_0/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft",
"Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest",
}
# Stores created viewport window objects
_robot_viewports: dict[str, object] = {}
def create_robot_viewports() -> None:
"""Create 4 viewport windows and bind each to a robot camera prim.
Must be called after env.reset() so all prims exist on the USD stage.
"""
try:
import omni.kit.viewport.utility as vp_util
except ImportError:
logger.warning("[Viewport] omni.kit.viewport.utility not available — skipping viewport creation.")
return
for name, cam_path in _ROBOT_CAM_PRIMS.items():
vp_win = vp_util.create_viewport_window(
f"Robot {name} View", width=640, height=360
)
vp_win.viewport_api.camera_path = cam_path
_robot_viewports[name] = vp_win
print(f"[INFO] Viewport 'Robot {name} View' bound to: {cam_path}")
def rebind_robot_viewports() -> None:
"""Re-bind all robot viewports to their camera paths.
Call this after every env reset so viewports stay locked to the cameras.
"""
for name, vp_win in _robot_viewports.items():
cam_path = _ROBOT_CAM_PRIMS[name]
vp_win.viewport_api.camera_path = cam_path
# =====================================================================
# Helpers
# =====================================================================
def _quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R:
"""Convert Isaac-style wxyz quaternion to scipy Rotation."""
return R.from_quat([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]])
def _rotation_to_quat_wxyz(rot: R) -> np.ndarray:
"""Convert scipy Rotation to Isaac-style wxyz quaternion."""
q = rot.as_quat()
return np.array([q[3], q[0], q[1], q[2]])
def world_pose_to_root_frame(
pos_w: np.ndarray,
quat_wxyz: np.ndarray,
root_pos_w: np.ndarray,
root_quat_wxyz: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Express a world-frame pose in the robot root frame."""
root_rot = _quat_wxyz_to_rotation(root_quat_wxyz)
pose_rot = _quat_wxyz_to_rotation(quat_wxyz)
pos_root = root_rot.inv().apply(pos_w - root_pos_w)
quat_root = _rotation_to_quat_wxyz(root_rot.inv() * pose_rot)
return pos_root, quat_root
# =====================================================================
# Teleoperation Controller
# =====================================================================
class XrTeleopController:
"""Teleop controller for PICO XR headset (absolute IK mode).
Accumulates an absolute EEF target in Isaac Sim world frame.
Grip button acts as clutch; B/Y buttons trigger environment reset.
"""
def __init__(self, arm="left", pos_sensitivity=1.0, rot_sensitivity=0.3,
max_pos_per_step=0.05, max_rot_per_step=0.08, xr_client=None):
self.xr_client = xr_client if xr_client is not None else XrClient()
self._owns_client = xr_client is None # only close if we created it
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
# Hard caps per physics step to prevent IK divergence.
# max_rot_per_step ~4.6°, max_pos_per_step 5 cm.
self.max_pos_per_step = max_pos_per_step
self.max_rot_per_step = max_rot_per_step
self.arm = arm
self.controller_name = "left_controller" if arm == "left" else "right_controller"
self.grip_name = "left_grip" if arm == "left" else "right_grip"
self.trigger_name = "left_trigger" if arm == "left" else "right_trigger"
self.R_headset_world = R_HEADSET_TO_WORLD
self.prev_xr_pos = None
self.prev_xr_quat = None
self.target_eef_pos = None # world frame
self.target_eef_quat = None # world frame, wxyz
self.grip_active = False
self.frame_count = 0
self.reset_button_latched = False
self.require_grip_reengage = False
self.grip_engage_threshold = 0.8
self.grip_release_threshold = 0.2
self.callbacks = {}
def add_callback(self, name: str, func: Callable):
self.callbacks[name] = func
def reset(self):
self.prev_xr_pos = None
self.prev_xr_quat = None
self.grip_active = False
self.frame_count = 0
self.target_eef_pos = None
self.target_eef_quat = None
# Require grip release after reset to avoid driving to stale pose.
self.require_grip_reengage = True
def close(self):
if self._owns_client:
self.xr_client.close()
def get_zero_action(self, trigger, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Return 8D hold action [x, y, z, qw, qx, qy, qz, gripper] at frozen target pose."""
action = torch.zeros(8)
if self.target_eef_pos is not None and self.target_eef_quat is not None:
# Prefer frozen world-frame target so IK holds a fixed world position
# even if the robot chassis drifts slightly under gravity.
action[:3] = torch.tensor(self.target_eef_pos.copy())
action[3:7] = torch.tensor(self.target_eef_quat.copy())
elif current_eef_pos is not None and current_eef_quat is not None:
action[:3] = torch.tensor(current_eef_pos.copy())
action[3:7] = torch.tensor(current_eef_quat.copy())
else:
action[3] = 1.0 # identity quaternion
action[7] = 1.0 if trigger > 0.5 else -1.0
return action
def advance(self, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Read XR controller and return 8D absolute action [x, y, z, qw, qx, qy, qz, gripper].
Position and quaternion are in Isaac Sim world frame.
Caller must convert to robot root frame before sending to IK.
"""
try:
# Only left controller Y button resets (not right controller B)
if self.arm == "left":
reset_pressed = self.xr_client.get_button("Y")
else:
reset_pressed = False
if reset_pressed and not self.reset_button_latched:
if "RESET" in self.callbacks:
self.callbacks["RESET"]()
self.reset_button_latched = reset_pressed
except Exception:
pass
try:
raw_pose = self.xr_client.get_pose(self.controller_name)
grip = self.xr_client.get_key_value(self.grip_name)
trigger = self.xr_client.get_key_value(self.trigger_name)
except Exception:
return self.get_zero_action(0.0, current_eef_pos, current_eef_quat)
if not is_valid_quaternion(raw_pose[3:]):
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Wait for grip release after reset
if self.require_grip_reengage:
if grip <= self.grip_release_threshold:
self.require_grip_reengage = False
else:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Grip button as clutch with hysteresis
grip_pressed = grip > self.grip_release_threshold if self.grip_active else grip >= self.grip_engage_threshold
if not grip_pressed:
self.prev_xr_pos = None
self.prev_xr_quat = None
# Only snapshot target on the transition frame (grip just released) or first ever frame.
# After that, keep it frozen so IK holds a fixed world-frame position.
if self.grip_active or self.target_eef_pos is None:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = False
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
if not self.grip_active:
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = True
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Project current and previous XR poses into Isaac Sim world frame
xr_world_pos, xr_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:])
prev_xr_world_pos, prev_xr_world_quat_xyzw = transform_xr_pose(self.prev_xr_pos, self.prev_xr_quat)
# Delta position (world frame), clamped per step
world_delta_pos = (xr_world_pos - prev_xr_world_pos) * self.pos_sensitivity
pos_norm = np.linalg.norm(world_delta_pos)
if pos_norm > self.max_pos_per_step:
world_delta_pos *= self.max_pos_per_step / pos_norm
# Delta rotation (world frame), clamped per step
world_delta_rot = quat_diff_as_rotvec_xyzw(prev_xr_world_quat_xyzw, xr_world_quat_xyzw) * self.rot_sensitivity
rot_norm = np.linalg.norm(world_delta_rot)
if rot_norm > self.max_rot_per_step:
world_delta_rot *= self.max_rot_per_step / rot_norm
gripper_action = 1.0 if trigger > 0.5 else -1.0
# Accumulate absolute target
if self.target_eef_pos is None:
self.target_eef_pos = np.zeros(3)
self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0])
self.target_eef_pos += world_delta_pos
# Apply rotation delta in world frame so controller direction = EEF world direction
target_r = _quat_wxyz_to_rotation(self.target_eef_quat)
self.target_eef_quat = _rotation_to_quat_wxyz(R.from_rotvec(world_delta_rot) * target_r)
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
self.frame_count += 1
action = torch.tensor([
*self.target_eef_pos,
*self.target_eef_quat,
gripper_action,
], dtype=torch.float32)
if self.frame_count % 30 == 0:
np.set_printoptions(precision=4, suppress=True, floatmode='fixed')
print("\n====================== [VR TELEOP DEBUG] ======================")
print(f"| Raw VR Pos (OpenXR): {np.array(raw_pose[:3])}")
print(f"| Raw VR Quat (xyzw): {np.array(raw_pose[3:])}")
print(f"| XR Delta Pos (world): {world_delta_pos} (norm={np.linalg.norm(world_delta_pos):.4f})")
print(f"| XR Delta Rot (world): {world_delta_rot} (norm={np.linalg.norm(world_delta_rot):.4f})")
print(f"| Targ Pos (world): {action[:3].numpy()}")
print(f"| Targ Quat (world, wxyz): {action[3:7].numpy()}")
print(f"| Gripper & Trigger: Grip={grip:.2f}, Trig={trigger:.2f}")
print("===============================================================")
return action
# =====================================================================
# Head Tracking Utility
# =====================================================================
def get_head_joint_targets(
xr_client, neutral_rot: R | None
) -> tuple[np.ndarray, R | None]:
"""Extract head joint targets [yaw, pitch] from HMD pose.
At first call (neutral_rot is None) the current headset orientation
is recorded as the neutral reference and zeros are returned.
Subsequent calls return yaw/pitch relative to that neutral pose.
Returns:
(joint_targets, neutral_rot) where joint_targets is shape (2,)
float32 [head_yaw_Joint_rad, head_pitch_Joint_rad].
"""
try:
raw_pose = xr_client.get_pose("headset")
if not is_valid_quaternion(raw_pose[3:]):
return np.zeros(2, dtype=np.float32), neutral_rot
_, head_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:])
head_rot = R.from_quat(head_world_quat_xyzw)
if neutral_rot is None:
# First call — record neutral and hold zero
return np.zeros(2, dtype=np.float32), head_rot
# Relative rotation from neutral heading
rel_rot = head_rot * neutral_rot.inv()
# ZYX Euler: [0]=yaw (Z), [1]=pitch (Y)
euler_zyx = rel_rot.as_euler("ZYX")
yaw = float(np.clip(euler_zyx[0], -1.57, 1.57)) # ±90°
pitch = float(np.clip(euler_zyx[1], -0.52, 0.52)) # ±30°
return np.array([yaw, pitch], dtype=np.float32), neutral_rot
except Exception:
return np.zeros(2, dtype=np.float32), neutral_rot
# =====================================================================
# Main Execution Loop
# =====================================================================
def main() -> None:
"""Run teleoperation with PICO XR Controller against Isaac Lab environment."""
env_cfg = parse_env_cfg(args_cli.task, num_envs=args_cli.num_envs)
env_cfg.env_name = args_cli.task
if not isinstance(env_cfg, ManagerBasedRLEnvCfg):
raise ValueError(f"Teleoperation requires ManagerBasedRLEnvCfg. Got: {type(env_cfg)}")
if "Abs" not in args_cli.task:
raise ValueError(
f"Task '{args_cli.task}' is not an absolute IK task. "
"Only Abs tasks are supported (e.g. Isaac-MindRobot-LeftArm-IK-Abs-v0)."
)
env_cfg.terminations.time_out = None
try:
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
except Exception as e:
logger.error(f"Failed to create environment '{args_cli.task}': {e}")
simulation_app.close()
return
# Detect single-arm vs dual-arm based on action manager term names
is_dual_arm = "left_arm_action" in env.action_manager._terms
has_head = "head_action" in env.action_manager._terms
# -- Sim-to-VR stereo streaming setup --
streamer = None
if args_cli.stream_to:
try:
from mindbot.utils.sim_video_streamer import SimVideoStreamer
# Get camera resolution from scene config (vr_left_eye)
scene = env.unwrapped.scene
if "vr_left_eye" not in scene.sensors:
logger.warning("[Stream] vr_left_eye camera not found in scene. Streaming disabled.")
else:
cam = scene["vr_left_eye"]
cam_w = cam.cfg.width
cam_h = cam.cfg.height
streamer = SimVideoStreamer(
host=args_cli.stream_to,
port=args_cli.stream_port,
width=cam_w,
height=cam_h,
fps=30,
bitrate=args_cli.stream_bitrate,
)
if streamer.connect():
print(f"[INFO] Sim stereo streaming to {args_cli.stream_to}:{args_cli.stream_port} "
f"({cam_w*2}x{cam_h} SBS @ 30fps)")
else:
logger.error("[Stream] Failed to connect. Streaming disabled.")
streamer = None
except ImportError:
logger.error("[Stream] mindbot.utils.sim_video_streamer not found. pip install av?")
except Exception as e:
logger.error(f"[Stream] Init failed: {e}")
# Sensitivity: explicit args override --sensitivity, which overrides defaults
pos_sens = 1.0
rot_sens = 0.3
if args_cli.sensitivity is not None:
pos_sens = args_cli.sensitivity
rot_sens = args_cli.sensitivity
if args_cli.pos_sensitivity is not None:
pos_sens = args_cli.pos_sensitivity
if args_cli.rot_sensitivity is not None:
rot_sens = args_cli.rot_sensitivity
if is_dual_arm:
print(f"\n[INFO] Dual-arm mode detected. Using both controllers.")
print(f"[INFO] IK Mode: ABSOLUTE")
print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}")
shared_client = XrClient()
teleop_left = XrTeleopController(arm="left", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client)
teleop_right = XrTeleopController(arm="right", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client)
teleop_interface = teleop_left # primary interface for callbacks/reset
teleop_right_ref = teleop_right
else:
print(f"\n[INFO] Connecting to PICO XR Headset using {args_cli.arm} controller...")
print(f"[INFO] IK Mode: ABSOLUTE")
print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}")
teleop_interface = XrTeleopController(
arm=args_cli.arm,
pos_sensitivity=pos_sens,
rot_sensitivity=rot_sens,
)
should_reset = False
def request_reset():
nonlocal should_reset
should_reset = True
print("[INFO] Reset requested via XR button.")
teleop_interface.add_callback("RESET", request_reset)
def clear_ik_target_state():
"""Sync IK controller's internal target to current EEF pose to avoid jumps after reset."""
if is_dual_arm:
for term_key in ["left_arm_action", "right_arm_action"]:
arm_action_term = env.action_manager._terms[term_key]
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
else:
arm_action_term = env.action_manager._terms["arm_action"]
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
def convert_action_world_to_root(action_tensor: torch.Tensor) -> torch.Tensor:
"""Convert absolute IK action from world frame to robot root frame."""
robot = env.unwrapped.scene["robot"]
root_pos_w = robot.data.root_pos_w[0].detach().cpu().numpy()
root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy()
pos_root, quat_root = world_pose_to_root_frame(
action_tensor[:3].numpy(), action_tensor[3:7].numpy(),
root_pos_w, root_quat_w,
)
out = action_tensor.clone()
out[:3] = torch.tensor(pos_root, dtype=torch.float32)
out[3:7] = torch.tensor(quat_root, dtype=torch.float32)
return out
def get_chassis_commands() -> tuple[torch.Tensor, float, float]:
"""Read left joystick → (wheel_cmd 4D, v_fwd m/s, omega rad/s).
wheel_cmd : differential wheel velocities for the action vector (training data).
v_fwd : desired robot forward speed in m/s (for direct base control).
omega : desired robot yaw rate in rad/s (for direct base control).
"""
try:
joy = teleop_interface.xr_client.get_joystick("left")
jy = float(joy[1])
jx = float(joy[0])
except Exception:
return torch.zeros(4), 0.0, 0.0
# Wheel velocity commands for action vector (rad/s)
v_w = jy * args_cli.base_speed
omega_w = jx * args_cli.base_turn
right_vel = v_w - omega_w
left_vel = v_w + omega_w
wheel_cmd = torch.tensor([right_vel, left_vel, left_vel, right_vel], dtype=torch.float32)
if abs(v_w) > 0.1 or abs(omega_w) > 0.1:
print(f"[WHEEL] joy=({jx:+.2f},{jy:+.2f}) v_wheel={v_w:.2f} ω_wheel={omega_w:.2f}{wheel_cmd.tolist()}")
# Direct base velocity commands (m/s, rad/s)
v_fwd = jy * args_cli.drive_speed
omega = -jx * args_cli.drive_turn # jx<0 (left push) → positive yaw = left turn
return wheel_cmd, v_fwd, omega
def apply_base_velocity(v_fwd: float, omega: float) -> None:
"""Directly set robot root linear+angular velocity to bypass isotropic friction.
PhysX uses isotropic friction, so skid-steer lateral slip is impossible through
wheel torques alone. This function overrides the base velocity each step to give
clean translational + rotational motion regardless of friction.
The write is buffered and applied at the start of the next env.step().
"""
if abs(v_fwd) < 1e-4 and abs(omega) < 1e-4:
return
robot = env.unwrapped.scene["robot"]
rq = robot.data.root_quat_w # [N, 4] wxyz
w_q, x_q, y_q, z_q = rq[:, 0], rq[:, 1], rq[:, 2], rq[:, 3]
# Robot forward direction in world frame (rotate [1,0,0] by root quaternion)
fwd_x = 1.0 - 2.0 * (y_q * y_q + z_q * z_q)
fwd_y = 2.0 * (x_q * y_q + w_q * z_q)
vel = torch.zeros(env.num_envs, 6, device=device)
vel[:, 0] = v_fwd * fwd_x # world x linear
vel[:, 1] = v_fwd * fwd_y # world y linear
# vel[:, 2] = 0 # z: let physics handle gravity/contact
vel[:, 5] = omega # world z angular (yaw)
robot.write_root_velocity_to_sim(vel)
obs, _ = env.reset()
clear_ik_target_state()
teleop_interface.reset()
# Read contact sensor data directly from OmniGraph node USD attributes.
# The USD has action graphs that execute each physics step and write outputs to:
# outputs:inContact (bool) outputs:value (float, Newtons)
# /root in USD → /World/envs/env_0/Robot in the loaded scene.
_CONTACT_OG_NODES = {
"left_r": (
"/World/envs/env_0/Robot"
"/l_zhuajia___m2_1_cs/isaac_read_contact_sensor_node"
),
"left_l": (
"/World/envs/env_0/Robot"
"/l_zhuajia___m3_1_cs/isaac_read_contact_sensor_node"
),
}
def _og_contact_reading(stage, node_path: str) -> tuple[bool, float]:
"""Read one OmniGraph contact node output via USD attribute API."""
prim = stage.GetPrimAtPath(node_path)
if not prim.IsValid():
return False, float("nan")
in_contact_attr = prim.GetAttribute("outputs:inContact")
force_attr = prim.GetAttribute("outputs:value")
in_contact = in_contact_attr.Get() if in_contact_attr.IsValid() else False
force = force_attr.Get() if force_attr.IsValid() else 0.0
return bool(in_contact), float(force or 0.0)
# One-time diagnostic: run after first env.reset() so all prims exist
def _diag_contact_nodes() -> None:
import omni.usd
stage = omni.usd.get_context().get_stage()
print("\n[DIAG] OmniGraph contact node check:")
for name, path in _CONTACT_OG_NODES.items():
prim = stage.GetPrimAtPath(path)
if prim.IsValid():
has_in = prim.GetAttribute("outputs:inContact").IsValid()
has_val = prim.GetAttribute("outputs:value").IsValid()
print(f" [{name}] ✓ prim valid | outputs:inContact={has_in} outputs:value={has_val}")
else:
# Try to find similar prims to hint correct path
parent_path = "/".join(path.split("/")[:-1])
parent = stage.GetPrimAtPath(parent_path)
print(f" [{name}] ✗ NOT FOUND at: {path}")
if parent.IsValid():
children = [c.GetName() for c in parent.GetChildren()]
print(f" parent exists, children: {children}")
else:
gp_path = "/".join(path.split("/")[:-2])
gp = stage.GetPrimAtPath(gp_path)
print(f" grandparent '{gp_path}' valid={gp.IsValid()}")
def read_gripper_contact() -> str:
"""Return contact status string by reading OmniGraph node USD attributes."""
try:
import omni.usd
stage = omni.usd.get_context().get_stage()
c_r, f_r = _og_contact_reading(stage, _CONTACT_OG_NODES["left_r"])
c_l, f_l = _og_contact_reading(stage, _CONTACT_OG_NODES["left_l"])
if f_r != f_r: # nan → prim not found
return "prim not found (run with diag)"
sym_r = "" if c_r else ""
sym_l = "" if c_l else ""
return f"{sym_r}R={f_r:.1f}N {sym_l}L={f_l:.1f}N"
except Exception as _e:
return f"N/A ({_e})"
# One-time diagnostic: verify OmniGraph contact sensor node paths
_diag_contact_nodes()
# Create debug viewport windows (always on)
create_robot_viewports()
if is_dual_arm:
teleop_right_ref.reset()
print("\n" + "=" * 50)
print(" Teleoperation Started!")
if is_dual_arm:
print(" LEFT controller → left arm")
print(" RIGHT controller → right arm")
print(" TRIGGER: open/close gripper")
print(" GRIP (hold): move the arm")
print(" Left joystick: Y=forward/back, X=turn")
print(" Y (left controller): reset environment")
print("=" * 50 + "\n")
device = env.unwrapped.device
sim_frame = 0
obs, _ = env.reset()
clear_ik_target_state()
last_root_left = None
last_root_right = None
neutral_head_rot = None # calibrated on first headset sample after reset
# trunk_action uses use_default_offset=False, so action = absolute position target.
trunk_cmd = torch.tensor([0.1], dtype=torch.float32)
while simulation_app.is_running():
try:
with torch.inference_mode():
if should_reset:
obs, _ = env.reset()
clear_ik_target_state()
teleop_interface.reset()
if is_dual_arm:
teleop_right_ref.reset()
rebind_robot_viewports()
should_reset = False
sim_frame = 0
last_root_left = None
last_root_right = None
neutral_head_rot = None # re-calibrate on next headset sample
continue
policy_obs = obs["policy"]
wheel_np, v_fwd, omega_base = get_chassis_commands()
# Head tracking: HMD yaw/pitch relative to neutral pose
head_cmd_np, neutral_head_rot = get_head_joint_targets(
teleop_interface.xr_client, neutral_head_rot
)
head_cmd = torch.tensor(head_cmd_np, dtype=torch.float32)
if is_dual_arm:
eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy()
eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy()
eef_quat_right = policy_obs["eef_quat_right"][0].cpu().numpy()
left_action = teleop_left.advance(current_eef_pos=eef_pos_left, current_eef_quat=eef_quat_left)
right_action = teleop_right_ref.advance(current_eef_pos=eef_pos_right, current_eef_quat=eef_quat_right)
# Only recompute root-frame IK target when grip is active; otherwise freeze joints
if teleop_left.grip_active or last_root_left is None:
last_root_left = convert_action_world_to_root(left_action)[:7].clone()
if teleop_right_ref.grip_active or last_root_right is None:
last_root_right = convert_action_world_to_root(right_action)[:7].clone()
# Action layout depends on task:
# with head: left_arm(7) | wheel(4) | left_gripper(1) | right_arm(7) | right_gripper(1) | head(2) | trunk(1) = 23D
# without head: left_arm(7) | wheel(4) | left_gripper(1) | right_arm(7) | right_gripper(1) = 20D
parts = [
last_root_left, wheel_np, left_action[7:8],
last_root_right, right_action[7:8],
]
if has_head:
parts.append(head_cmd)
parts.append(trunk_cmd)
action_np = torch.cat(parts)
else:
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
action_np = teleop_interface.advance(current_eef_pos=eef_pos, current_eef_quat=eef_quat)
if teleop_interface.grip_active or last_root_left is None:
last_root_left = convert_action_world_to_root(action_np)[:7].clone()
# Action layout: arm(7) | wheel(4) | gripper(1)
action_np = torch.cat([last_root_left, wheel_np, action_np[7:8]])
actions = action_np.unsqueeze(0).repeat(env.num_envs, 1).to(device)
obs, _, _, _, _ = env.step(actions)
# Direct base velocity override: bypasses isotropic friction limitation
# so skid-steer turning works correctly.
apply_base_velocity(v_fwd, omega_base)
# Stream sim stereo to VR headset (if enabled)
if streamer is not None:
try:
scene = env.unwrapped.scene
left_rgb = scene["vr_left_eye"].data.output["rgb"][0].cpu().numpy()
right_rgb = scene["vr_right_eye"].data.output["rgb"][0].cpu().numpy()
if not streamer.send_frame(left_rgb, right_rgb):
logger.warning("[Stream] Connection lost. Disabling streaming.")
streamer = None
except Exception as e:
if sim_frame % 300 == 0:
logger.warning(f"[Stream] Frame error: {e}")
sim_frame += 1
if sim_frame % 30 == 0:
np.set_printoptions(precision=4, suppress=True, floatmode='fixed')
policy_obs = obs["policy"]
joint_pos = policy_obs["joint_pos"][0].cpu().numpy()
last_act = policy_obs["actions"][0].cpu().numpy()
if sim_frame == 30:
robot = env.unwrapped.scene["robot"]
jnames = robot.joint_names
print(f"\n{'='*70}")
print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS")
print(f"{'='*70}")
for i, name in enumerate(jnames):
print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}")
arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
print(f" Deduced left arm indices: {arm_idx}")
print(f"{'='*70}\n")
if not hasattr(env, '_arm_idx_cache'):
robot = env.unwrapped.scene["robot"]
jnames = robot.joint_names
env._arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
env._right_arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("r_joint")]
arm_joints = joint_pos[env._arm_idx_cache]
right_arm_joints = joint_pos[env._right_arm_idx_cache]
try:
joy_dbg = teleop_interface.xr_client.get_joystick("left")
joy_str = f"[{joy_dbg[0]:+.2f}, {joy_dbg[1]:+.2f}]"
except Exception:
joy_str = "N/A"
if is_dual_arm:
eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy()
eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy()
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| Right Arm Joints (rad): {right_arm_joints}")
print(f"| Left EEF Pos (world, m): {eef_pos_left}")
print(f"| Right EEF Pos (world, m): {eef_pos_right}")
print(f"| Left Gripper Contact: {read_gripper_contact()}")
print(f"| Cmd left_arm(abs): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd left_gripper: {last_act[11:12]} (+1=open -1=close)")
print(f"| Cmd right_arm(abs): {last_act[12:19]}")
print(f"| Cmd right_gripper: {last_act[19:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
else:
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
if not hasattr(env, '_prev_eef_quat'):
env._prev_eef_quat = eef_quat.copy()
prev_q = env._prev_eef_quat
r_prev = R.from_quat([prev_q[1], prev_q[2], prev_q[3], prev_q[0]])
r_curr = R.from_quat([eef_quat[1], eef_quat[2], eef_quat[3], eef_quat[0]])
delta_rotvec = (r_curr * r_prev.inv()).as_rotvec()
delta_angle_deg = np.degrees(np.linalg.norm(delta_rotvec))
delta_axis = delta_rotvec / (np.linalg.norm(delta_rotvec) + 1e-9)
env._prev_eef_quat = eef_quat.copy()
approach_vec = r_curr.apply(np.array([1.0, 0.0, 0.0]))
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| EEF Pos (world, m): {eef_pos}")
print(f"| EEF approach vec (world): [{approach_vec[0]:+.3f}, {approach_vec[1]:+.3f}, {approach_vec[2]:+.3f}] (夹爪朝向)")
print(f"| EEF rot since last print: {delta_angle_deg:+.2f}° around [{delta_axis[0]:+.3f},{delta_axis[1]:+.3f},{delta_axis[2]:+.3f}] (world)")
print(f"| Cmd arm(abs pos+quat): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd gripper: {last_act[11:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break
if streamer is not None:
streamer.close()
teleop_interface.close()
if is_dual_arm:
teleop_right_ref.close()
shared_client.close()
env.close()
if __name__ == "__main__":
main()
simulation_app.close()

View File

@@ -0,0 +1,31 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# SPDX-License-Identifier: BSD-3-Clause
"""XR teleoperation package for MindBot.
Agent hierarchy:
BaseTeleopAgent → SingleArmXrAgent (8D)
BaseTeleopAgent → DualArmXrAgent (20D) → DualArmHeadXrAgent (23D)
"""
from .base_agent import BaseTeleopAgent
from .single_arm_agent import SingleArmXrAgent
from .dual_arm_agent import DualArmXrAgent
from .dual_arm_head_agent import DualArmHeadXrAgent
from .xr_controller import XrTeleopController
from .chassis import ChassisController
from .head_tracker import HeadTracker
from .streaming import StreamingManager
from .diagnostics import DiagnosticsReporter
__all__ = [
"BaseTeleopAgent",
"SingleArmXrAgent",
"DualArmXrAgent",
"DualArmHeadXrAgent",
"XrTeleopController",
"ChassisController",
"HeadTracker",
"StreamingManager",
"DiagnosticsReporter",
]

View File

@@ -0,0 +1,136 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Base teleoperation agent with main loop, env management, and reset handling."""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
import torch
logger = logging.getLogger(__name__)
class BaseTeleopAgent(ABC):
"""Abstract base for XR teleoperation agents.
Provides the main simulation loop, env lifecycle, and reset handling.
Subclasses implement assemble_action() to define their control dimensions.
"""
def __init__(self, env, simulation_app, *, debug_viewports: bool = True):
self.env = env
self.simulation_app = simulation_app
self.device = env.unwrapped.device
self.num_envs = env.num_envs
self._should_reset = False
self.sim_frame = 0
# Viewport management
self._robot_viewports: dict[str, object] = {}
self._robot_cam_prims: dict[str, str] = {
"Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand",
"Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand",
"Head": "/World/envs/env_0/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft",
"Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest",
}
if debug_viewports:
self._create_viewports()
def request_reset(self):
self._should_reset = True
print("[INFO] Reset requested via XR button.")
def _create_viewports(self):
try:
import omni.kit.viewport.utility as vp_util
except ImportError:
logger.warning("[Viewport] omni.kit.viewport.utility not available.")
return
for name, cam_path in self._robot_cam_prims.items():
vp_win = vp_util.create_viewport_window(f"Robot {name} View", width=640, height=360)
vp_win.viewport_api.camera_path = cam_path
self._robot_viewports[name] = vp_win
print(f"[INFO] Viewport 'Robot {name} View' bound to: {cam_path}")
def _rebind_viewports(self):
for name, vp_win in self._robot_viewports.items():
vp_win.viewport_api.camera_path = self._robot_cam_prims[name]
def _clear_ik_target_state(self):
"""Sync IK controller internals to current EEF pose to prevent jumps after reset."""
action_terms = self.env.action_manager._terms
for key in self._ik_action_term_names():
if key in action_terms:
term = action_terms[key]
ee_pos_b, ee_quat_b = term._compute_frame_pose()
term._raw_actions.zero_()
term._processed_actions.zero_()
term._ik_controller._command.zero_()
term._ik_controller.ee_pos_des[:] = ee_pos_b
term._ik_controller.ee_quat_des[:] = ee_quat_b
def _ik_action_term_names(self) -> list[str]:
"""Return IK action term names to clear on reset. Override in subclass."""
return ["arm_action"]
def _do_reset(self, obs):
"""Execute reset sequence. Returns new obs."""
obs, _ = self.env.reset()
self._clear_ik_target_state()
self._rebind_viewports()
self._should_reset = False
self.sim_frame = 0
self.on_reset()
return obs
def on_reset(self):
"""Hook for subclasses to reset their own state."""
pass
@abstractmethod
def assemble_action(self, obs) -> torch.Tensor:
"""Build the full action tensor for env.step(). Must be implemented by subclass."""
...
def post_step(self, obs):
"""Hook called after env.step(). Subclasses can add streaming, diagnostics, etc."""
pass
def run(self):
"""Main simulation loop."""
obs, _ = self.env.reset()
self._clear_ik_target_state()
self.on_reset()
self._print_banner()
while self.simulation_app.is_running():
try:
with torch.inference_mode():
if self._should_reset:
obs = self._do_reset(obs)
continue
action = self.assemble_action(obs)
actions = action.unsqueeze(0).repeat(self.num_envs, 1).to(self.device)
obs, _, _, _, _ = self.env.step(actions)
self.sim_frame += 1
self.post_step(obs)
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break
self.cleanup()
def _print_banner(self):
print("\n" + "=" * 50)
print(" Teleoperation Started!")
print("=" * 50 + "\n")
def cleanup(self):
"""Release resources. Override in subclass to close controllers, streamers, etc."""
self.env.close()

View File

@@ -0,0 +1,58 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Chassis (mobile base) control via XR joystick."""
import torch
class ChassisController:
"""Converts left joystick input into wheel velocities and direct base velocity commands.
Args:
base_speed: Max wheel speed (rad/s) at full joystick.
base_turn: Max wheel differential (rad/s) at full joystick.
drive_speed: Max robot linear speed (m/s) for direct base control.
drive_turn: Max robot yaw rate (rad/s) for direct base control.
"""
def __init__(self, base_speed: float = 5.0, base_turn: float = 2.0,
drive_speed: float = 0.5, drive_turn: float = 1.5):
self.base_speed = base_speed
self.base_turn = base_turn
self.drive_speed = drive_speed
self.drive_turn = drive_turn
def get_commands(self, xr_client) -> tuple[torch.Tensor, float, float]:
"""Read left joystick and return (wheel_cmd_4D, v_fwd_m_s, omega_rad_s)."""
try:
joy = xr_client.get_joystick("left")
jy = float(joy[1])
jx = float(joy[0])
except Exception:
return torch.zeros(4), 0.0, 0.0
v_w = jy * self.base_speed
omega_w = jx * self.base_turn
right_vel = v_w - omega_w
left_vel = v_w + omega_w
wheel_cmd = torch.tensor([right_vel, left_vel, left_vel, right_vel], dtype=torch.float32)
v_fwd = jy * self.drive_speed
omega = -jx * self.drive_turn # left push → positive yaw = left turn
return wheel_cmd, v_fwd, omega
@staticmethod
def apply_base_velocity(robot, v_fwd: float, omega: float, num_envs: int, device) -> None:
"""Directly set robot root velocity to bypass isotropic friction for skid-steer turning."""
if abs(v_fwd) < 1e-4 and abs(omega) < 1e-4:
return
rq = robot.data.root_quat_w # [N, 4] wxyz
w_q, x_q, y_q, z_q = rq[:, 0], rq[:, 1], rq[:, 2], rq[:, 3]
fwd_x = 1.0 - 2.0 * (y_q * y_q + z_q * z_q)
fwd_y = 2.0 * (x_q * y_q + w_q * z_q)
vel = torch.zeros(num_envs, 6, device=device)
vel[:, 0] = v_fwd * fwd_x
vel[:, 1] = v_fwd * fwd_y
vel[:, 5] = omega
robot.write_root_velocity_to_sim(vel)

View File

@@ -0,0 +1,98 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Periodic diagnostics reporter for teleoperation debugging."""
import numpy as np
class DiagnosticsReporter:
"""Prints robot state info every N frames during teleoperation."""
def __init__(self, interval: int = 30, is_dual_arm: bool = False):
self.interval = interval
self.is_dual_arm = is_dual_arm
self._left_arm_idx: list[int] | None = None
self._right_arm_idx: list[int] | None = None
self._joint_names_printed = False
def _ensure_indices(self, robot):
if self._left_arm_idx is None:
jnames = robot.joint_names
self._left_arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
self._right_arm_idx = [i for i, n in enumerate(jnames) if n.startswith("r_joint")]
self._trunk_idx = jnames.index("PrismaticJoint") if "PrismaticJoint" in jnames else None
def report(self, env, obs, frame: int, last_act: np.ndarray | None = None,
xr_client=None, read_gripper_contact=None, trunk_target: float | None = None):
"""Print diagnostics if frame is on interval boundary."""
if frame % self.interval != 0:
return
np.set_printoptions(precision=4, suppress=True, floatmode='fixed')
policy_obs = obs["policy"]
joint_pos = policy_obs["joint_pos"][0].cpu().numpy()
if last_act is None:
last_act = policy_obs["actions"][0].cpu().numpy()
robot = env.unwrapped.scene["robot"]
self._ensure_indices(robot)
# Print full joint table on first report
if not self._joint_names_printed:
jnames = robot.joint_names
print(f"\n{'=' * 70}")
print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS")
print(f"{'=' * 70}")
for i, name in enumerate(jnames):
print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}")
print(f" Deduced left arm indices: {self._left_arm_idx}")
print(f"{'=' * 70}\n")
self._joint_names_printed = True
arm_joints = joint_pos[self._left_arm_idx]
right_arm_joints = joint_pos[self._right_arm_idx]
joy_str = "N/A"
if xr_client is not None:
try:
joy_dbg = xr_client.get_joystick("left")
joy_str = f"[{joy_dbg[0]:+.2f}, {joy_dbg[1]:+.2f}]"
except Exception:
pass
if self.is_dual_arm:
eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy()
contact_str = read_gripper_contact() if read_gripper_contact else "N/A"
print(f"\n---------------- [ROBOT STATE frame={frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| Right Arm Joints (rad): {right_arm_joints}")
print(f"| Left EEF Pos (world, m): {eef_pos_left}")
print(f"| Right EEF Pos (world, m): {eef_pos_right}")
print(f"| Left Gripper Contact: {contact_str}")
print(f"| Cmd left_arm(abs): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd left_gripper: {last_act[11:12]} (+1=open -1=close)")
print(f"| Cmd right_arm(abs): {last_act[12:19]}")
print(f"| Cmd right_gripper: {last_act[19:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
if self._trunk_idx is not None:
trunk_pos = robot.data.joint_pos[0, self._trunk_idx].item()
trunk_vel = robot.data.joint_vel[0, self._trunk_idx].item()
trunk_force = robot.data.applied_torque[0, self._trunk_idx].item() if hasattr(robot.data, 'applied_torque') else float('nan')
trunk_tgt_str = f"{trunk_target:.4f}" if trunk_target is not None else "N/A"
print(f"| Trunk pos={trunk_pos:.4f} vel={trunk_vel:.6f} target={trunk_tgt_str} force={trunk_force:.2f}")
print(f"----------------------------------------------------------------")
else:
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
print(f"\n---------------- [ROBOT STATE frame={frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| EEF Pos (world, m): {eef_pos}")
print(f"| Cmd arm(abs pos+quat): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd gripper: {last_act[11:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")

View File

@@ -0,0 +1,139 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Dual-arm XR teleoperation agent with chassis control."""
from __future__ import annotations
import torch
from xr_utils import XrClient
from .base_agent import BaseTeleopAgent
from .xr_controller import XrTeleopController
from .chassis import ChassisController
from .frame_utils import convert_action_world_to_root
class DualArmXrAgent(BaseTeleopAgent):
"""Dual-arm teleoperation with chassis joystick control.
Action: left_arm(7) | wheel(4) | left_grip(1) | right_arm(7) | right_grip(1) = 20D
"""
def __init__(self, env, simulation_app, *,
pos_sensitivity: float = 1.0,
rot_sensitivity: float = 0.3,
base_speed: float = 5.0,
base_turn: float = 2.0,
drive_speed: float = 0.5,
drive_turn: float = 1.5,
debug_viewports: bool = True):
super().__init__(env, simulation_app, debug_viewports=debug_viewports)
self.shared_client = XrClient()
self.teleop_left = XrTeleopController(
arm="left", pos_sensitivity=pos_sensitivity,
rot_sensitivity=rot_sensitivity, xr_client=self.shared_client,
)
self.teleop_right = XrTeleopController(
arm="right", pos_sensitivity=pos_sensitivity,
rot_sensitivity=rot_sensitivity, xr_client=self.shared_client,
)
self.teleop_left.add_callback("RESET", self.request_reset)
self.chassis = ChassisController(
base_speed=base_speed, base_turn=base_turn,
drive_speed=drive_speed, drive_turn=drive_turn,
)
self._last_root_left = None
self._last_root_right = None
@property
def xr_client(self):
return self.shared_client
def _ik_action_term_names(self) -> list[str]:
return ["left_arm_action", "right_arm_action"]
def on_reset(self):
self.teleop_left.reset()
self.teleop_right.reset()
self._last_root_left = None
self._last_root_right = None
def assemble_action(self, obs) -> torch.Tensor:
policy_obs = obs["policy"]
robot = self.env.unwrapped.scene["robot"]
# Get robot root pose for world→body EEF conversion
root_pos_w = robot.data.root_pos_w[0].cpu().numpy()
root_quat_w = robot.data.root_quat_w[0].cpu().numpy()
# Read chassis
wheel_cmd, self._v_fwd, self._omega = self.chassis.get_commands(self.shared_client)
# Convert EEF observations from world frame to body frame
from .frame_utils import world_pose_to_root_frame
eef_pos_left_w = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_quat_left_w = policy_obs["eef_quat_left"][0].cpu().numpy()
eef_pos_left_b, eef_quat_left_b = world_pose_to_root_frame(
eef_pos_left_w, eef_quat_left_w, root_pos_w, root_quat_w)
eef_pos_right_w = policy_obs["eef_pos_right"][0].cpu().numpy()
eef_quat_right_w = policy_obs["eef_quat_right"][0].cpu().numpy()
eef_pos_right_b, eef_quat_right_b = world_pose_to_root_frame(
eef_pos_right_w, eef_quat_right_w, root_pos_w, root_quat_w)
# XR controller works in body frame:
# - current_eef is body frame (converted above)
# - XR delta is treated as body-frame delta directly (no rotation)
# because in VR teleop the user's visual frame tracks the robot heading
# - output is body frame → send directly to IK
left_action = self.teleop_left.advance(current_eef_pos=eef_pos_left_b, current_eef_quat=eef_quat_left_b)
right_action = self.teleop_right.advance(current_eef_pos=eef_pos_right_b, current_eef_quat=eef_quat_right_b)
# Joint-locking: target is already in body frame, use directly
if self.teleop_left.grip_active or self._last_root_left is None:
self._last_root_left = left_action[:7].clone()
if self.teleop_right.grip_active or self._last_root_right is None:
self._last_root_right = right_action[:7].clone()
# Diagnostic: print once per second when left grip active
_cnt = getattr(self, '_diag_cnt', 0) + 1
self._diag_cnt = _cnt
if self.teleop_left.grip_active and _cnt % 30 == 0:
import math
print(f"[FRAME DIAG] root_quat_w={root_quat_w}")
print(f" eef_left_world=({eef_pos_left_w[0]:.3f},{eef_pos_left_w[1]:.3f},{eef_pos_left_w[2]:.3f})")
print(f" eef_left_body =({eef_pos_left_b[0]:.3f},{eef_pos_left_b[1]:.3f},{eef_pos_left_b[2]:.3f})")
print(f" ik_cmd_body =({self._last_root_left[0]:.3f},{self._last_root_left[1]:.3f},{self._last_root_left[2]:.3f})")
# left_arm(7) | wheel(4) | left_grip(1) | right_arm(7) | right_grip(1)
return torch.cat([
self._last_root_left, wheel_cmd, left_action[7:8],
self._last_root_right, right_action[7:8],
])
def post_step(self, obs):
# Apply direct base velocity override for skid-steer
robot = self.env.unwrapped.scene["robot"]
self.chassis.apply_base_velocity(robot, self._v_fwd, self._omega, self.num_envs, self.device)
def cleanup(self):
self.teleop_left.close()
self.teleop_right.close()
self.shared_client.close()
super().cleanup()
def _print_banner(self):
print("\n" + "=" * 50)
print(" Teleoperation Started!")
print(" LEFT controller -> left arm")
print(" RIGHT controller -> right arm")
print(" TRIGGER: open/close gripper")
print(" GRIP (hold): move the arm")
print(" Left joystick: Y=forward/back, X=turn")
print(" Y (left controller): reset environment")
print("=" * 50 + "\n")

View File

@@ -0,0 +1,91 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Dual-arm + head tracking + trunk + stereo streaming XR agent."""
from __future__ import annotations
import torch
from .dual_arm_agent import DualArmXrAgent
from .head_tracker import HeadTracker
from .streaming import StreamingManager
from .diagnostics import DiagnosticsReporter
class DualArmHeadXrAgent(DualArmXrAgent):
"""Extends DualArmXrAgent with head tracking, trunk hold, and VR stereo streaming.
Action: left_arm(7) | wheel(4) | left_grip(1) | right_arm(7) | right_grip(1) | head(2) | trunk(1) = 23D
"""
def __init__(self, env, simulation_app, *,
pos_sensitivity: float = 1.0,
rot_sensitivity: float = 0.3,
base_speed: float = 5.0,
base_turn: float = 2.0,
drive_speed: float = 0.5,
drive_turn: float = 1.5,
stream_to: str | None = None,
stream_port: int = 12345,
stream_bitrate: int = 20_000_000,
debug_viewports: bool = True):
super().__init__(
env, simulation_app,
pos_sensitivity=pos_sensitivity, rot_sensitivity=rot_sensitivity,
base_speed=base_speed, base_turn=base_turn,
drive_speed=drive_speed, drive_turn=drive_turn,
debug_viewports=debug_viewports,
)
self.head_tracker = HeadTracker()
# Read trunk initial position from ArticulationCfg (single source of truth)
robot = env.unwrapped.scene["robot"]
joint_names = robot.data.joint_names
trunk_idx = joint_names.index("PrismaticJoint")
trunk_init = robot.data.default_joint_pos[0, trunk_idx].item()
self.trunk_cmd = torch.tensor([trunk_init], dtype=torch.float32)
# Streaming
self.streamer: StreamingManager | None = None
if stream_to:
scene = env.unwrapped.scene
self.streamer = StreamingManager(stream_to, stream_port, scene, bitrate=stream_bitrate)
# Diagnostics
self.diagnostics = DiagnosticsReporter(interval=30, is_dual_arm=True)
def on_reset(self):
super().on_reset()
self.head_tracker.reset()
def assemble_action(self, obs) -> torch.Tensor:
base_action = super().assemble_action(obs)
# Head tracking
head_targets = self.head_tracker.get_targets(self.shared_client)
head_cmd = torch.tensor(head_targets, dtype=torch.float32)
# base(20) | head(2) | trunk(1)
return torch.cat([base_action, head_cmd, self.trunk_cmd])
def post_step(self, obs):
super().post_step(obs)
# Stereo streaming
if self.streamer is not None:
scene = self.env.unwrapped.scene
self.streamer.send(scene, self.sim_frame)
# Diagnostics
self.diagnostics.report(
self.env, obs, self.sim_frame,
xr_client=self.shared_client,
trunk_target=self.trunk_cmd[0].item(),
)
def cleanup(self):
if self.streamer is not None:
self.streamer.close()
super().cleanup()

View File

@@ -0,0 +1,55 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Coordinate frame utilities for XR teleoperation."""
import numpy as np
import torch
from scipy.spatial.transform import Rotation as R
def quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R:
"""Convert Isaac-style wxyz quaternion to scipy Rotation."""
return R.from_quat([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]])
def rotation_to_quat_wxyz(rot: R) -> np.ndarray:
"""Convert scipy Rotation to Isaac-style wxyz quaternion."""
q = rot.as_quat()
return np.array([q[3], q[0], q[1], q[2]])
def world_pose_to_root_frame(
pos_w: np.ndarray,
quat_wxyz: np.ndarray,
root_pos_w: np.ndarray,
root_quat_wxyz: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Express a world-frame pose in the robot root frame."""
root_rot = quat_wxyz_to_rotation(root_quat_wxyz)
pose_rot = quat_wxyz_to_rotation(quat_wxyz)
pos_root = root_rot.inv().apply(pos_w - root_pos_w)
quat_root = rotation_to_quat_wxyz(root_rot.inv() * pose_rot)
return pos_root, quat_root
def convert_action_world_to_root(action_tensor: torch.Tensor, robot) -> torch.Tensor:
"""Convert absolute IK action from world frame to robot root frame.
Args:
action_tensor: 8D tensor [x, y, z, qw, qx, qy, qz, gripper] in world frame.
robot: Isaac Lab Articulation object (env.unwrapped.scene["robot"]).
Returns:
8D tensor with pos/quat converted to robot root frame.
"""
root_pos_w = robot.data.root_pos_w[0].detach().cpu().numpy()
root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy()
pos_root, quat_root = world_pose_to_root_frame(
action_tensor[:3].numpy(), action_tensor[3:7].numpy(),
root_pos_w, root_quat_w,
)
out = action_tensor.clone()
out[:3] = torch.tensor(pos_root, dtype=torch.float32)
out[3:7] = torch.tensor(quat_root, dtype=torch.float32)
return out

View File

@@ -0,0 +1,44 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Head tracking via XR headset (HMD) for robot head joint control."""
import numpy as np
from scipy.spatial.transform import Rotation as R
from xr_utils import transform_xr_pose, is_valid_quaternion
class HeadTracker:
"""Extracts yaw/pitch from HMD pose relative to a calibrated neutral orientation."""
def __init__(self, yaw_limit: float = 1.57, pitch_limit: float = 0.52):
self.yaw_limit = yaw_limit # ±90°
self.pitch_limit = pitch_limit # ±30°
self.neutral_rot: R | None = None
def reset(self):
"""Clear neutral calibration; next call to get_targets() will re-calibrate."""
self.neutral_rot = None
def get_targets(self, xr_client) -> np.ndarray:
"""Return [yaw, pitch] in radians relative to neutral HMD pose.
First call after reset() records neutral orientation and returns zeros.
"""
try:
raw_pose = xr_client.get_pose("headset")
if not is_valid_quaternion(raw_pose[3:]):
return np.zeros(2, dtype=np.float32)
_, head_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:])
head_rot = R.from_quat(head_world_quat_xyzw)
if self.neutral_rot is None:
self.neutral_rot = head_rot
return np.zeros(2, dtype=np.float32)
rel_rot = head_rot * self.neutral_rot.inv()
euler_zyx = rel_rot.as_euler("ZYX")
yaw = float(np.clip(euler_zyx[0], -self.yaw_limit, self.yaw_limit))
pitch = float(np.clip(euler_zyx[1], -self.pitch_limit, self.pitch_limit))
return np.array([yaw, pitch], dtype=np.float32)
except Exception:
return np.zeros(2, dtype=np.float32)

View File

@@ -0,0 +1,60 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# SPDX-License-Identifier: BSD-3-Clause
"""Single-arm XR teleoperation agent."""
from __future__ import annotations
import torch
from .base_agent import BaseTeleopAgent
from .xr_controller import XrTeleopController
from .frame_utils import convert_action_world_to_root
class SingleArmXrAgent(BaseTeleopAgent):
"""Single-arm teleoperation: one XR controller -> one robot arm + gripper.
Action: arm(7) | gripper(1) = 8D
"""
def __init__(self, env, simulation_app, *,
arm: str = "left",
pos_sensitivity: float = 1.0,
rot_sensitivity: float = 0.3,
xr_client=None,
debug_viewports: bool = True):
super().__init__(env, simulation_app, debug_viewports=debug_viewports)
self.xr_client = xr_client
self.teleop = XrTeleopController(
arm=arm, pos_sensitivity=pos_sensitivity,
rot_sensitivity=rot_sensitivity, xr_client=xr_client,
)
self.teleop.add_callback("RESET", self.request_reset)
self._last_root = None
def _ik_action_term_names(self) -> list[str]:
return ["arm_action"]
def on_reset(self):
self.teleop.reset()
self._last_root = None
def assemble_action(self, obs) -> torch.Tensor:
policy_obs = obs["policy"]
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
action = self.teleop.advance(current_eef_pos=eef_pos, current_eef_quat=eef_quat)
robot = self.env.unwrapped.scene["robot"]
if self.teleop.grip_active or self._last_root is None:
self._last_root = convert_action_world_to_root(action, robot)[:7].clone()
# arm(7) | gripper(1)
return torch.cat([self._last_root, action[7:8]])
def cleanup(self):
self.teleop.close()
super().cleanup()

View File

@@ -0,0 +1,82 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Sim-to-VR H264 stereo streaming manager."""
import logging
logger = logging.getLogger(__name__)
class StreamingManager:
"""Manages stereo camera → H264 → TCP streaming to VR headset.
Wraps SimVideoStreamer with graceful error handling and auto-disable on failure.
"""
def __init__(self, host: str, port: int, scene, bitrate: int = 20_000_000):
self._streamer = None
self._enabled = False
if "vr_left_eye" not in scene.sensors:
logger.warning("[Stream] vr_left_eye camera not found. Streaming disabled.")
return
try:
from mindbot.utils.sim_video_streamer import SimVideoStreamer
cam = scene["vr_left_eye"]
self._streamer = SimVideoStreamer(
host=host, port=port,
width=cam.cfg.width, height=cam.cfg.height,
fps=30, bitrate=bitrate,
)
if self._streamer.connect():
self._enabled = True
print(f"[INFO] Sim stereo streaming to {host}:{port} "
f"({cam.cfg.width * 2}x{cam.cfg.height} SBS @ 30fps)")
else:
logger.error("[Stream] Failed to connect. Streaming disabled.")
self._streamer = None
except ImportError:
logger.error("[Stream] mindbot.utils.sim_video_streamer not found. pip install av?")
except Exception as e:
logger.error(f"[Stream] Init failed: {e}")
@property
def enabled(self) -> bool:
return self._enabled
_debug_saved = False
def send(self, scene, frame_count: int = 0) -> None:
"""Send one stereo frame from scene cameras. Auto-disables on failure."""
if not self._enabled:
return
try:
left_rgb = scene["vr_left_eye"].data.output["rgb"][0].cpu().numpy()
right_rgb = scene["vr_right_eye"].data.output["rgb"][0].cpu().numpy()
# Save debug images once (frame 5 to ensure rendering is stable)
if frame_count == 150 and not self._debug_saved:
self._debug_saved = True
try:
from PIL import Image
import numpy as np
Image.fromarray(left_rgb[..., :3]).save("/tmp/vr_left_debug.png")
Image.fromarray(right_rgb[..., :3]).save("/tmp/vr_right_debug.png")
sbs = np.concatenate([left_rgb, right_rgb], axis=1)
Image.fromarray(sbs[..., :3]).save("/tmp/vr_sbs_debug.png")
print(f"[STREAM DIAG] Saved debug frames: left={left_rgb.shape}, right={right_rgb.shape}")
except Exception as e:
print(f"[STREAM DIAG] Save failed: {e}")
if not self._streamer.send_frame(left_rgb, right_rgb):
logger.warning("[Stream] Connection lost. Disabling streaming.")
self._enabled = False
except Exception as e:
if frame_count % 300 == 0:
logger.warning(f"[Stream] Frame error: {e}")
def close(self):
if self._streamer is not None:
self._streamer.close()

View File

@@ -0,0 +1,191 @@
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""XR teleoperation controller with absolute IK target accumulation and grip clutch."""
from collections.abc import Callable
import numpy as np
import torch
from scipy.spatial.transform import Rotation as R
from xr_utils import XrClient, transform_xr_pose, quat_diff_as_rotvec_xyzw, is_valid_quaternion
from .frame_utils import quat_wxyz_to_rotation, rotation_to_quat_wxyz
class XrTeleopController:
"""Teleop controller for PICO XR headset (absolute IK mode).
Accumulates an absolute EEF target in Isaac Sim world frame.
Grip button acts as clutch; B/Y buttons trigger environment reset.
"""
def __init__(self, arm="left", pos_sensitivity=1.0, rot_sensitivity=0.3,
max_pos_per_step=0.05, max_rot_per_step=0.08, xr_client=None):
self.xr_client = xr_client if xr_client is not None else XrClient()
self._owns_client = xr_client is None
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
self.max_pos_per_step = max_pos_per_step
self.max_rot_per_step = max_rot_per_step
self.arm = arm
self.controller_name = "left_controller" if arm == "left" else "right_controller"
self.grip_name = "left_grip" if arm == "left" else "right_grip"
self.trigger_name = "left_trigger" if arm == "left" else "right_trigger"
from xr_utils.geometry import R_HEADSET_TO_WORLD
self.R_headset_world = R_HEADSET_TO_WORLD
self.prev_xr_pos = None
self.prev_xr_quat = None
self.target_eef_pos = None
self.target_eef_quat = None
self.grip_active = False
self.frame_count = 0
self.reset_button_latched = False
self.require_grip_reengage = False
self.grip_engage_threshold = 0.8
self.grip_release_threshold = 0.2
self.callbacks = {}
self._root_yaw_rad = 0.0 # current chassis heading, updated each frame
def add_callback(self, name: str, func: Callable):
self.callbacks[name] = func
def reset(self):
self.prev_xr_pos = None
self.prev_xr_quat = None
self.grip_active = False
self.frame_count = 0
self.target_eef_pos = None
self.target_eef_quat = None
self.require_grip_reengage = True
def set_root_yaw(self, yaw_rad: float):
"""Update the chassis heading so XR deltas are in body frame."""
self._root_yaw_rad = yaw_rad
def close(self):
if self._owns_client:
self.xr_client.close()
def get_zero_action(self, trigger, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Return 8D hold action [x, y, z, qw, qx, qy, qz, gripper] at frozen target pose."""
action = torch.zeros(8)
if self.target_eef_pos is not None and self.target_eef_quat is not None:
action[:3] = torch.tensor(self.target_eef_pos.copy())
action[3:7] = torch.tensor(self.target_eef_quat.copy())
elif current_eef_pos is not None and current_eef_quat is not None:
action[:3] = torch.tensor(current_eef_pos.copy())
action[3:7] = torch.tensor(current_eef_quat.copy())
else:
action[3] = 1.0
action[7] = 1.0 if trigger > 0.5 else -1.0
return action
def advance(self, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Read XR controller and return 8D absolute action [x, y, z, qw, qx, qy, qz, gripper].
Position and quaternion are in Isaac Sim world frame.
Caller must convert to robot root frame before sending to IK.
"""
try:
if self.arm == "left":
reset_pressed = self.xr_client.get_button("Y")
else:
reset_pressed = False
if reset_pressed and not self.reset_button_latched:
if "RESET" in self.callbacks:
self.callbacks["RESET"]()
self.reset_button_latched = reset_pressed
except Exception:
pass
try:
raw_pose = self.xr_client.get_pose(self.controller_name)
grip = self.xr_client.get_key_value(self.grip_name)
trigger = self.xr_client.get_key_value(self.trigger_name)
except Exception:
return self.get_zero_action(0.0, current_eef_pos, current_eef_quat)
if not is_valid_quaternion(raw_pose[3:]):
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
if self.require_grip_reengage:
if grip <= self.grip_release_threshold:
self.require_grip_reengage = False
else:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
grip_pressed = grip > self.grip_release_threshold if self.grip_active else grip >= self.grip_engage_threshold
if not grip_pressed:
self.prev_xr_pos = None
self.prev_xr_quat = None
if self.grip_active or self.target_eef_pos is None:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = False
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
if not self.grip_active:
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = True
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
xr_world_pos, xr_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:])
prev_xr_world_pos, prev_xr_world_quat_xyzw = transform_xr_pose(self.prev_xr_pos, self.prev_xr_quat)
world_delta_pos = (xr_world_pos - prev_xr_world_pos) * self.pos_sensitivity
# In VR teleop, user "inhabits" the robot: their physical forward (XR +X)
# maps directly to robot forward (body +X). No rotation needed — the VR view
# already rotates with the robot, so XR deltas naturally align with body frame.
# Diagnostic
if self.frame_count % 30 == 0 and self.target_eef_pos is not None:
print(f"[XR {self.arm}] delta=({world_delta_pos[0]:+.4f},{world_delta_pos[1]:+.4f},{world_delta_pos[2]:+.4f}) "
f"target=({self.target_eef_pos[0]:.3f},{self.target_eef_pos[1]:.3f},{self.target_eef_pos[2]:.3f})")
pos_norm = np.linalg.norm(world_delta_pos)
if pos_norm > self.max_pos_per_step:
world_delta_pos *= self.max_pos_per_step / pos_norm
world_delta_rot = quat_diff_as_rotvec_xyzw(prev_xr_world_quat_xyzw, xr_world_quat_xyzw) * self.rot_sensitivity
rot_norm = np.linalg.norm(world_delta_rot)
if rot_norm > self.max_rot_per_step:
world_delta_rot *= self.max_rot_per_step / rot_norm
gripper_action = 1.0 if trigger > 0.5 else -1.0
if self.target_eef_pos is None:
self.target_eef_pos = np.zeros(3)
self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0])
self.target_eef_pos += world_delta_pos
target_r = quat_wxyz_to_rotation(self.target_eef_quat)
self.target_eef_quat = rotation_to_quat_wxyz(R.from_rotvec(world_delta_rot) * target_r)
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
self.frame_count += 1
action = torch.tensor([
*self.target_eef_pos,
*self.target_eef_quat,
gripper_action,
], dtype=torch.float32)
return action

View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""Standalone test for SimVideoStreamer: generates a color-cycling SBS test pattern
and streams it to a TCP server (e.g. PICO VR headset).
Usage:
# 1. On PICO: select SIM-Stereo, click Listen
# 2. On PC:
python scripts/test_sim_streamer.py --host 172.20.103.171 --port 12345
The PICO should display a side-by-side test pattern:
left half = cycling red, right half = cycling blue.
"""
import argparse
import time
import numpy as np
def main():
parser = argparse.ArgumentParser(description="Test SimVideoStreamer with synthetic frames")
parser.add_argument("--host", type=str, required=True, help="PICO IP address (TCP server)")
parser.add_argument("--port", type=int, default=12345, help="TCP port")
parser.add_argument("--width", type=int, default=960, help="Eye width")
parser.add_argument("--height", type=int, default=540, help="Eye height")
parser.add_argument("--fps", type=int, default=30, help="Target FPS")
parser.add_argument("--duration", type=int, default=30, help="Stream duration in seconds")
args = parser.parse_args()
# Direct import to avoid pulling in the full mindbot/isaaclab dependency chain
import importlib.util, pathlib
_spec = importlib.util.spec_from_file_location(
"sim_video_streamer",
pathlib.Path(__file__).resolve().parents[1] / "source/mindbot/mindbot/utils/sim_video_streamer.py",
)
_mod = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_mod)
SimVideoStreamer = _mod.SimVideoStreamer
streamer = SimVideoStreamer(
host=args.host,
port=args.port,
width=args.width,
height=args.height,
fps=args.fps,
)
print(f"Connecting to {args.host}:{args.port}...")
if not streamer.connect():
print("Failed to connect. Is PICO listening?")
return
print(f"Streaming {args.width*2}x{args.height} SBS test pattern for {args.duration}s...")
frame_interval = 1.0 / args.fps
start_time = time.time()
frame_count = 0
try:
while time.time() - start_time < args.duration:
t = time.time() - start_time
# Generate test pattern: cycling colors
# Left eye: red gradient with horizontal bars
left = np.zeros((args.height, args.width, 3), dtype=np.uint8)
red_val = int(128 + 127 * np.sin(t * 2.0))
left[:, :, 0] = red_val # R channel
# Add horizontal position indicator
bar_y = int((args.height - 20) * (0.5 + 0.5 * np.sin(t * 1.5)))
left[bar_y:bar_y+20, :, :] = 255
# Right eye: blue gradient with horizontal bars
right = np.zeros((args.height, args.width, 3), dtype=np.uint8)
blue_val = int(128 + 127 * np.sin(t * 2.0 + 1.0))
right[:, :, 2] = blue_val # B channel
right[bar_y:bar_y+20, :, :] = 255 # same bar position for stereo reference
if not streamer.send_frame(left, right):
print("Connection lost.")
break
frame_count += 1
# Maintain frame rate
elapsed = time.time() - start_time
expected = frame_count * frame_interval
if expected > elapsed:
time.sleep(expected - elapsed)
except KeyboardInterrupt:
print("\nStopped by user.")
elapsed = time.time() - start_time
print(f"Sent {frame_count} frames in {elapsed:.1f}s ({frame_count/elapsed:.1f} fps)")
streamer.close()
if __name__ == "__main__":
main()

View File

@@ -13,7 +13,7 @@ The following configurations are available:
"""
import isaaclab.sim as sim_utils
from isaaclab.actuators import ImplicitActuatorCfg
from isaaclab.actuators import IdealPDActuatorCfg, ImplicitActuatorCfg
from isaaclab.assets import ArticulationCfg
from mindbot.utils.assets import MINDBOT_ASSETS_DIR
@@ -21,7 +21,7 @@ from mindbot.utils.assets import MINDBOT_ASSETS_DIR
MINDBOT_CFG = ArticulationCfg(
spawn=sim_utils.UsdFileCfg(
# usd_path=f"{MINDBOT_ASSETS_DIR}/robots/mindrobot_2i/mindrobot_2i.usd",
usd_path=f"{MINDBOT_ASSETS_DIR}/robots/mindrobot_2i/contact_sensor.usd",
usd_path=f"{MINDBOT_ASSETS_DIR}/robots/mindbot_zed/mindbot_mini.usd",
activate_contact_sensors=True,
rigid_props=sim_utils.RigidBodyPropertiesCfg(
disable_gravity=False,
@@ -29,7 +29,7 @@ MINDBOT_CFG = ArticulationCfg(
),
articulation_props=sim_utils.ArticulationRootPropertiesCfg(
fix_root_link=False,
enabled_self_collisions=True,
enabled_self_collisions=False, # TODO: re-enable after fixing trunk self-collision
solver_position_iteration_count=8,
solver_velocity_iteration_count=0,
),
@@ -58,11 +58,14 @@ MINDBOT_CFG = ArticulationCfg(
"right_hand_joint_left": 0.0,
"right_hand_joint_right": 0.0,
# ====== Trunk & Head ======
"PrismaticJoint": 0.26,
# NOTE: pos z = 0.44 (base clearance) + PrismaticJoint value.
# If you change PrismaticJoint, update pos z accordingly to avoid
# geometry clipping that causes the robot to fall on spawn.
"PrismaticJoint": 0.2,
"head_pitch_Joint": 0.0,
"head_yaw_Joint": 0.0,
},
pos=(0.0, 0.0, 0.7),
pos=(0.001, 0.001, 0.7), # z = 0.44 (base clearance) + 0.26 (PrismaticJoint)
),
actuators={
# RM-65 J1-J3: shoulder/elbow — max 180°/s ≈ 3.14 rad/s
@@ -104,12 +107,17 @@ MINDBOT_CFG = ArticulationCfg(
stiffness=80.0,
damping=4.0,
),
"trunk": ImplicitActuatorCfg(
# IdealPDActuator: explicitly computes F = k_p*(target-pos) + k_d*(target_vel-vel)
# and sends via set_dof_actuation_forces(), bypassing PhysX drive which
# doesn't work correctly for prismatic joints with set_dof_position_targets().
# NOTE: trunk gravity disabled by startup event. High stiffness to resist
# arm reaction forces (~150N) — deflection < 2mm at 100k stiffness.
"trunk": IdealPDActuatorCfg(
joint_names_expr=["PrismaticJoint"],
effort_limit_sim=200.0,
velocity_limit_sim=0.2,
stiffness=1000.0,
damping=100.0,
effort_limit=2000.0,
velocity_limit=0.5,
stiffness=100000.0,
damping=5000.0,
),
"wheels": ImplicitActuatorCfg(
# joint_names_expr=[".*_revolute_Joint"],
@@ -168,6 +176,8 @@ MINDBOT_HEAD_JOIONTS =[
"head_pitch_Joint"
]
MINDBOT_TRUNK_JOINT = "PrismaticJoint"
# EEF body names (as defined in the USD asset)
MINDBOT_LEFT_EEF_BODY = "left_hand_body"
MINDBOT_RIGHT_EEF_BODY = "right_hand_body"

View File

@@ -30,6 +30,7 @@ from isaaclab.managers import TerminationTermCfg as DoneTerm
from isaaclab.markers.config import FRAME_MARKER_CFG
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.sensors.frame_transformer.frame_transformer_cfg import FrameTransformerCfg, OffsetCfg
from isaaclab.sensors import CameraCfg
from isaaclab.sim.schemas.schemas_cfg import RigidBodyPropertiesCfg
from isaaclab.sim.spawners.from_files.from_files_cfg import GroundPlaneCfg
from isaaclab.sim.spawners.materials import RigidBodyMaterialCfg
@@ -48,6 +49,7 @@ from .mindrobot_2i_cfg import (
MINDBOT_RIGHT_GRIPPER_JOINTS,
MINDBOT_WHEEL_JOINTS,
MINDBOT_HEAD_JOIONTS,
MINDBOT_TRUNK_JOINT,
MINDBOT_LEFT_ARM_PRIM_ROOT,
MINDBOT_RIGHT_ARM_PRIM_ROOT,
MINDBOT_LEFT_EEF_PRIM,
@@ -96,6 +98,28 @@ class MindRobotDualArmSceneCfg(InteractiveSceneCfg):
ee_frame: FrameTransformerCfg = None
ee_frame_right: FrameTransformerCfg = None
# VR stereo cameras — reuse existing ZED_X CameraLeft/CameraRight prims from USD
# (spawn=None means use existing prim, don't create a new one)
# The ZED_X baseline in USD is 120mm; for perfect VR IPD we'd ideally set 65mm,
# but reusing existing prims avoids spawn issues. stereoOffset=0 in video_source.yml
# since sim rendering gives correct perspective regardless of physical baseline.
vr_left_eye: CameraCfg = CameraCfg(
prim_path="{ENV_REGEX_NS}/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft",
update_period=1 / 30,
height=1440, # 2560x1440 per eye → SBS 5120x1440; effective ~1920x1440 after shader crop
width=2560,
data_types=["rgb"],
spawn=None,
)
vr_right_eye: CameraCfg = CameraCfg(
prim_path="{ENV_REGEX_NS}/Robot/robot_head/ZED_X/base_link/ZED_X/CameraRight",
update_period=1 / 30,
height=1440,
width=2560,
data_types=["rgb"],
spawn=None,
)
# =====================================================================
# Actions Configuration
@@ -166,6 +190,13 @@ class MindRobotDualArmActionsCfg:
scale=1.0,
)
trunk_action = JointPositionActionCfg(
asset_name="robot",
joint_names=[MINDBOT_TRUNK_JOINT],
scale=1.0,
use_default_offset=False, # action = absolute target, not offset from default
)
# =====================================================================
# Observations Configuration
@@ -236,12 +267,29 @@ def _disable_arm_gravity(env, env_ids: torch.Tensor):
schemas.modify_rigid_body_properties(arm_path, arm_cfg)
def _disable_trunk_head_gravity(env, env_ids: torch.Tensor):
"""Disable gravity for trunk and head at startup."""
import isaaclab.sim.schemas as schemas
cfg = RigidBodyPropertiesCfg(disable_gravity=True)
for env_id in range(env.num_envs):
for prim_path in [
f"/World/envs/env_{env_id}/Robot/robot_trunk",
f"/World/envs/env_{env_id}/Robot/robot_head",
]:
schemas.modify_rigid_body_properties(prim_path, cfg)
@configclass
class MindRobotDualArmEventsCfg:
disable_arm_gravity = EventTerm(
func=_disable_arm_gravity,
mode="startup",
)
disable_trunk_head_gravity = EventTerm(
func=_disable_trunk_head_gravity,
mode="startup",
)
reset_scene = EventTerm(
func=mdp.reset_scene_to_default,
mode="reset",
@@ -292,7 +340,7 @@ class MindRobot2iDualArmIKAbsEnvCfg(ManagerBasedRLEnvCfg):
lookat=(0.0, 0.0, 0.8), # 瞄准点(机器人腰部高度附近)
origin_type="world",
)
self.decimation = 2
self.decimation = 2 # 25 Hz control (was 2 = 50 Hz, unnecessary for teleop)
self.episode_length_s = 50.0
self.sim.dt = 0.01 # 100 Hz
self.sim.render_interval = 2

View File

@@ -56,10 +56,13 @@ MINDBOT_CFG = ArticulationCfg(
"right_hand_joint_left": 0.0,
"right_hand_joint_right": 0.0,
# ====== Trunk & Head ======
# NOTE: pos z = 0.44 (base clearance) + PrismaticJoint value.
# If you change PrismaticJoint, update pos z accordingly to avoid
# geometry clipping that causes the robot to fall on spawn.
"PrismaticJoint": 0.26,
"head_revoluteJoint": 0.0,
},
pos=(0.0, 0.0, 0.7),
pos=(0.0, 0.0, 0.7), # z = 0.44 (base clearance) + 0.26 (PrismaticJoint)
),
actuators={
# RM-65 J1-J3: shoulder/elbow — max 180°/s ≈ 3.14 rad/s

View File

@@ -283,7 +283,7 @@ class MindRobotDualArmIKAbsEnvCfg(ManagerBasedRLEnvCfg):
lookat=(0.0, 0.0, 0.8), # 瞄准点(机器人腰部高度附近)
origin_type="world",
)
self.decimation = 2
self.decimation = 2 # 25 Hz control (was 2 = 50 Hz, unnecessary for teleop)
self.episode_length_s = 50.0
self.sim.dt = 0.01 # 100 Hz
self.sim.render_interval = 2

View File

@@ -20,7 +20,7 @@ import os
##
MINDBOT_ASSETS_DIR: str = os.environ.get(
"MINDBOT_ASSETS_DIR",
# "/home/tangger/LYT/maic_usd_assets_moudle",
"/home/maic/xh/maic_usd_assets_moudle"
"/home/tangger/LYT/maic_usd_assets_moudle",
# "/home/maic/xh/maic_usd_assets_moudle"
# "/home/maic/LYT/maic_usd_assets_moudle",
)

View File

@@ -0,0 +1,238 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# SPDX-License-Identifier: BSD-3-Clause
"""Simulation stereo video streamer for VR headset.
Encodes stereo RGB frames (left + right eye) as H264 side-by-side video
and streams to the Unity VR client over TCP, using the same 4-byte
big-endian length-prefixed NALU protocol as RobotVisionConsole.
Usage:
streamer = SimVideoStreamer("172.20.103.171", 12345, width=960, height=540)
streamer.connect()
# In the sim loop:
streamer.send_frame(left_rgb_np, right_rgb_np)
# Cleanup:
streamer.close()
The Unity client (PICO) acts as TCP SERVER on the specified port.
This streamer connects as TCP CLIENT — same role as RobotVisionConsole.
"""
from __future__ import annotations
import logging
import socket
import struct
import threading
import time
from typing import Optional
import av
import numpy as np
logger = logging.getLogger(__name__)
class SimVideoStreamer:
"""H264 stereo video streamer over TCP for VR headset display.
Accepts left/right RGB numpy arrays, assembles a side-by-side frame,
encodes to H264 with PyAV, and sends each NALU packet prefixed with
a 4-byte big-endian length header over a TCP socket.
"""
def __init__(
self,
host: str,
port: int = 12345,
width: int = 960,
height: int = 540,
fps: int = 30,
bitrate: int = 20_000_000,
):
"""Initialize the streamer.
Args:
host: IP address of the PICO VR headset (TCP server).
port: TCP port the PICO is listening on.
width: Width of EACH eye image (SBS output = width*2 x height).
height: Height of each eye image.
fps: Target frame rate for H264 encoding.
bitrate: H264 encoding bitrate in bits per second.
"""
self.host = host
self.port = port
self.eye_width = width
self.eye_height = height
self.sbs_width = width * 2
self.sbs_height = height
self.fps = fps
self.bitrate = bitrate
self._sock: Optional[socket.socket] = None
self._codec_ctx: Optional[av.CodecContext] = None
self._frame_idx = 0
self._last_frame_time = 0.0
self._min_frame_interval = 1.0 / fps
# Async encoding: background thread picks up frames from _pending_frame
self._pending_frame: Optional[np.ndarray] = None
self._frame_lock = threading.Lock()
self._encode_thread: Optional[threading.Thread] = None
self._running = False
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def connect(self) -> bool:
"""Connect to the PICO VR headset TCP server.
Returns:
True if connection succeeded.
"""
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._sock.connect((self.host, self.port))
logger.info(f"[SimVideoStreamer] Connected to {self.host}:{self.port}")
except OSError as e:
logger.error(f"[SimVideoStreamer] TCP connect failed: {e}")
self._sock = None
return False
self._init_encoder()
self._frame_idx = 0
self._last_frame_time = 0.0
# Start background encode+send thread
self._running = True
self._encode_thread = threading.Thread(target=self._encode_loop, daemon=True)
self._encode_thread.start()
return True
def send_frame(self, left_rgb: np.ndarray, right_rgb: np.ndarray) -> bool:
"""Submit a stereo frame for async encoding and sending.
This method is NON-BLOCKING: it copies the frame data and returns
immediately. The background thread handles encoding and TCP send.
If the previous frame hasn't been encoded yet, this frame replaces it
(latest-frame-wins, no queue buildup).
Args:
left_rgb: Left eye image, shape (H, W, 3), dtype uint8, RGB order.
right_rgb: Right eye image, shape (H, W, 3), dtype uint8, RGB order.
Returns:
True if frame was accepted. False if connection is lost.
"""
if not self._running:
return False
# Assemble SBS and hand off to background thread (latest-frame-wins)
sbs = np.concatenate([left_rgb, right_rgb], axis=1) # (H, W*2, 3)
with self._frame_lock:
self._pending_frame = sbs
return True
def close(self):
"""Stop background thread, flush encoder, and close connection."""
self._running = False
if self._encode_thread is not None:
self._encode_thread.join(timeout=2.0)
self._encode_thread = None
if self._codec_ctx is not None:
try:
packets = self._codec_ctx.encode(None)
if self._sock is not None:
for pkt in packets:
nalu_data = bytes(pkt)
header = struct.pack(">I", len(nalu_data))
self._sock.sendall(header + nalu_data)
except Exception:
pass
self._codec_ctx = None
if self._sock is not None:
try:
self._sock.close()
except Exception:
pass
self._sock = None
logger.info("[SimVideoStreamer] Connection closed.")
# ------------------------------------------------------------------
# Private
# ------------------------------------------------------------------
def _init_encoder(self):
"""Initialize H264 encoder via PyAV."""
codec = av.Codec("libx264", "w")
self._codec_ctx = codec.create()
self._codec_ctx.width = self.sbs_width
self._codec_ctx.height = self.sbs_height
self._codec_ctx.pix_fmt = "yuv420p"
from fractions import Fraction
self._codec_ctx.time_base = Fraction(1, self.fps)
self._codec_ctx.framerate = Fraction(self.fps, 1)
self._codec_ctx.bit_rate = self.bitrate
self._codec_ctx.gop_size = self.fps # keyframe every 1 second
self._codec_ctx.max_b_frames = 0 # no B-frames for low latency
self._codec_ctx.thread_count = 2
# Low-latency encoding options
self._codec_ctx.options = {
"preset": "ultrafast",
"tune": "zerolatency",
"profile": "baseline", # widest decoder compatibility
}
self._codec_ctx.open()
logger.info(
f"[SimVideoStreamer] H264 encoder initialized: "
f"{self.sbs_width}x{self.sbs_height} @ {self.fps}fps, "
f"bitrate={self.bitrate/1e6:.1f}Mbps"
)
def _encode_loop(self):
"""Background thread: encode and send frames at target fps."""
while self._running:
# Grab the latest pending frame (if any)
with self._frame_lock:
sbs = self._pending_frame
self._pending_frame = None
if sbs is None:
time.sleep(0.001) # 1ms idle wait
continue
# Rate limit encoding to target fps
now = time.monotonic()
if now - self._last_frame_time < self._min_frame_interval:
continue
self._last_frame_time = now
# Encode
frame = av.VideoFrame.from_ndarray(sbs, format="rgb24")
frame.pts = self._frame_idx
self._frame_idx += 1
try:
packets = self._codec_ctx.encode(frame)
except av.error.FFmpegError as e:
logger.warning(f"[SimVideoStreamer] Encode error: {e}")
continue
# Send each packet
for pkt in packets:
nalu_data = bytes(pkt)
header = struct.pack(">I", len(nalu_data))
try:
self._sock.sendall(header + nalu_data)
except (BrokenPipeError, ConnectionResetError, OSError) as e:
logger.error(f"[SimVideoStreamer] Send failed: {e}")
self._running = False
return