#!/usr/bin/env python3 # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). # All rights reserved. # # SPDX-License-Identifier: BSD-3-Clause """ Script to run teleoperation with Isaac Lab manipulation environments using PICO XR Controllers. This script uses XRoboToolkit to fetch XR controller poses and maps them to differential IK actions. """ import argparse import logging import sys import os from collections.abc import Callable # Ensure xr_utils (next to this script) is importable when running directly sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from isaaclab.app import AppLauncher logger = logging.getLogger(__name__) # add argparse arguments parser = argparse.ArgumentParser( description="Teleoperation for Isaac Lab environments with PICO XR Controller." ) parser.add_argument( "--num_envs", type=int, default=1, help="Number of environments to simulate." ) parser.add_argument( "--task", type=str, default="Isaac-MindRobot-LeftArm-IK-Rel-v0", help="Name of the task.", ) parser.add_argument( "--sensitivity", type=float, default=5.0, help="Sensitivity factor for pos/rot." ) parser.add_argument( "--arm", type=str, default="left", choices=["left", "right"], help="Which arm/controller to use.", ) parser.add_argument( "--base_speed", type=float, default=3.0, help="Max wheel speed (rad/s) for joystick full forward.", ) parser.add_argument( "--base_turn", type=float, default=2.0, help="Max wheel differential (rad/s) for joystick full left/right.", ) # append AppLauncher cli args AppLauncher.add_app_launcher_args(parser) args_cli = parser.parse_args() app_launcher_args = vars(args_cli) # Disable some rendering settings to speed up app_launcher_args["xr"] = False # launch omniverse app app_launcher = AppLauncher(app_launcher_args) simulation_app = app_launcher.app """Rest everything follows.""" import gymnasium as gym import numpy as np import torch from scipy.spatial.transform import Rotation as R from isaaclab.envs import ManagerBasedRLEnvCfg import isaaclab_tasks # noqa: F401 import mindbot.tasks # noqa: F401 from isaaclab_tasks.utils import parse_env_cfg from xr_utils import XrClient, transform_xr_pose, quat_diff_as_rotvec_xyzw, is_valid_quaternion from xr_utils.geometry import R_HEADSET_TO_WORLD # ===================================================================== # Teleoperation Interface for XR # ===================================================================== def _quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R: """Convert Isaac-style wxyz quaternion to scipy Rotation.""" return R.from_quat([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]]) def _rotation_to_quat_wxyz(rot: R) -> np.ndarray: """Convert scipy Rotation quaternion to Isaac-style wxyz.""" quat_xyzw = rot.as_quat() return np.array([quat_xyzw[3], quat_xyzw[0], quat_xyzw[1], quat_xyzw[2]]) def world_pose_to_root_frame( pos_w: np.ndarray, quat_wxyz: np.ndarray, root_pos_w: np.ndarray, root_quat_wxyz: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: """Express a world-frame pose in the robot root frame.""" root_rot = _quat_wxyz_to_rotation(root_quat_wxyz) pose_rot = _quat_wxyz_to_rotation(quat_wxyz) pos_root = root_rot.inv().apply(pos_w - root_pos_w) quat_root = _rotation_to_quat_wxyz(root_rot.inv() * pose_rot) return pos_root, quat_root class XrTeleopController: """Teleop controller for PICO XR headset.""" def __init__(self, arm="left", pos_sensitivity=1.0, rot_sensitivity=0.3, is_absolute=False): self.xr_client = XrClient() self.pos_sensitivity = pos_sensitivity self.rot_sensitivity = rot_sensitivity self.arm = arm self.is_absolute = is_absolute self.controller_name = "left_controller" if arm == "left" else "right_controller" self.grip_name = "left_grip" if arm == "left" else "right_grip" self.trigger_name = "left_trigger" if arm == "left" else "right_trigger" # Coordinate transform matrix self.R_headset_world = R_HEADSET_TO_WORLD # Raw XR tracking space poses self.prev_xr_pos = None self.prev_xr_quat = None # Absolute target states self.target_eef_pos = None self.target_eef_quat = None # wxyz self.grip_active = False self.frame_count = 0 self.reset_button_latched = False self.require_grip_reengage = False self.grip_engage_threshold = 0.8 self.grip_release_threshold = 0.2 # Callbacks (like reset, etc) self.callbacks = {} def add_callback(self, name: str, func: Callable): self.callbacks[name] = func def reset(self): self.prev_xr_pos = None self.prev_xr_quat = None self.grip_active = False self.frame_count = 0 self.target_eef_pos = None self.target_eef_quat = None # Require one grip release after reset so stale controller motion # cannot immediately drive the robot back toward the previous pose. self.require_grip_reengage = True def close(self): self.xr_client.close() def get_zero_action(self, trigger, current_eef_pos=None, current_eef_quat=None): if self.is_absolute: action = torch.zeros(8) # Stay at the current valid pose, or fallback to the cached target if current_eef_pos is not None and current_eef_quat is not None: action[:3] = torch.tensor(current_eef_pos.copy()) action[3:7] = torch.tensor(current_eef_quat.copy()) elif self.target_eef_pos is not None and self.target_eef_quat is not None: action[:3] = torch.tensor(self.target_eef_pos.copy()) action[3:7] = torch.tensor(self.target_eef_quat.copy()) else: action[3] = 1.0 # default w=1 for quat action[7] = 1.0 if trigger > 0.5 else -1.0 else: action = torch.zeros(7) action[6] = 1.0 if trigger > 0.5 else -1.0 return action def advance(self, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor: """ Reads the XR controller. Relative bounds return 7D action tensor: [dx, dy, dz, drx, dry, drz, gripper] Absolute bounds return 8D action tensor: [x, y, z, qw, qx, qy, qz, gripper] Note: in absolute mode current_eef_* and the returned target are in WORLD frame. The caller is responsible for converting to root frame before sending to IK. """ # XR buttons check (e.g. A or B for reset) try: reset_pressed = self.xr_client.get_button("B") or self.xr_client.get_button("Y") if reset_pressed and not self.reset_button_latched: if "RESET" in self.callbacks: self.callbacks["RESET"]() self.reset_button_latched = reset_pressed except Exception: pass try: raw_pose = self.xr_client.get_pose(self.controller_name) grip = self.xr_client.get_key_value(self.grip_name) trigger = self.xr_client.get_key_value(self.trigger_name) except Exception as e: return self.get_zero_action(0.0, current_eef_pos, current_eef_quat) # Skip transformation if quaternion is invalid if not is_valid_quaternion(raw_pose[3:]): return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) if self.require_grip_reengage: if grip <= self.grip_release_threshold: self.require_grip_reengage = False else: if self.is_absolute and current_eef_pos is not None and current_eef_quat is not None: self.target_eef_pos = current_eef_pos.copy() self.target_eef_quat = current_eef_quat.copy() return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) # Use hysteresis so noisy analog grip values do not accidentally re-enable teleop. if self.grip_active: grip_pressed = grip > self.grip_release_threshold else: grip_pressed = grip >= self.grip_engage_threshold # 握持键作为离合器 (Clutch) if not grip_pressed: self.prev_xr_pos = None self.prev_xr_quat = None self.grip_active = False # Ensure target tracks the current pose while we are not grabbing if self.is_absolute and current_eef_pos is not None and current_eef_quat is not None: self.target_eef_pos = current_eef_pos.copy() self.target_eef_quat = current_eef_quat.copy() # wxyz return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) if not self.grip_active: self.prev_xr_pos = raw_pose[:3].copy() self.prev_xr_quat = raw_pose[3:].copy() if self.is_absolute and current_eef_pos is not None and current_eef_quat is not None: self.target_eef_pos = current_eef_pos.copy() self.target_eef_quat = current_eef_quat.copy() # wxyz self.grip_active = True return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) # Since OpenXR headset zero is not guaranteed to match robot zero, we compute the # raw transformation in World Frame, but apply it relatively to the stored target. # 1. First, find current XR pose projected into World frame xr_world_pos, xr_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:]) prev_xr_world_pos, prev_xr_world_quat_xyzw = transform_xr_pose(self.prev_xr_pos, self.prev_xr_quat) # 2. Extract Delta POS in World frame world_delta_pos = (xr_world_pos - prev_xr_world_pos) * self.pos_sensitivity # 3. Extract Delta ROT in World frame world_delta_rot = quat_diff_as_rotvec_xyzw(prev_xr_world_quat_xyzw, xr_world_quat_xyzw) * self.rot_sensitivity # 4. Gripper gripper_action = 1.0 if trigger > 0.5 else -1.0 if self.is_absolute: if self.target_eef_pos is None: self.target_eef_pos = np.zeros(3) self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0]) # Accumulate in world frame so VR direction always matches sim direction. self.target_eef_pos += world_delta_pos target_r = _quat_wxyz_to_rotation(self.target_eef_quat) delta_r = R.from_rotvec(world_delta_rot) self.target_eef_quat = _rotation_to_quat_wxyz(delta_r * target_r) action = torch.tensor([ self.target_eef_pos[0], self.target_eef_pos[1], self.target_eef_pos[2], self.target_eef_quat[0], self.target_eef_quat[1], self.target_eef_quat[2], self.target_eef_quat[3], gripper_action], dtype=torch.float32) self.prev_xr_pos = raw_pose[:3].copy() self.prev_xr_quat = raw_pose[3:].copy() else: max_pos_delta = 0.05 world_pos_norm = np.linalg.norm(world_delta_pos) if world_pos_norm > max_pos_delta: world_delta_pos = world_delta_pos * (max_pos_delta / world_pos_norm) max_rot_delta = 0.15 world_rot_norm = np.linalg.norm(world_delta_rot) if world_rot_norm > max_rot_delta: world_delta_rot = world_delta_rot * (max_rot_delta / world_rot_norm) action = torch.tensor([ world_delta_pos[0], world_delta_pos[1], world_delta_pos[2], world_delta_rot[0], world_delta_rot[1], world_delta_rot[2], gripper_action], dtype=torch.float32) self.prev_xr_pos = raw_pose[:3].copy() self.prev_xr_quat = raw_pose[3:].copy() self.frame_count += 1 if self.frame_count % 30 == 0: np.set_printoptions(precision=4, suppress=True, floatmode='fixed') print("\n====================== [VR TELEOP DEBUG] ======================") print(f"| Task Mode: {'ABSOLUTE' if self.is_absolute else 'RELATIVE'}") print(f"| Raw VR Pos (OpenXR): {np.array(raw_pose[:3])}") print(f"| Raw VR Quat (xyzw): {np.array(raw_pose[3:])}") print(f"| XR Delta Pos (world): {world_delta_pos} (norm={np.linalg.norm(world_delta_pos):.4f})") print(f"| XR Delta Rot (world): {world_delta_rot} (norm={np.linalg.norm(world_delta_rot):.4f})") if self.is_absolute: print(f"| Targ Pos (world): {action[:3].numpy()}") print(f"| Targ Quat (world, wxyz): {action[3:7].numpy()}") else: print(f"| Sent Action Pos (dx,dy,dz): {action[:3].numpy()}") print(f"| Sent Action Rot (rx,ry,rz): {action[3:6].numpy()}") print(f"| Gripper & Trigger: Grip={grip:.2f}, Trig={trigger:.2f}") print("===============================================================") return action # ===================================================================== # Main Execution Loop # ===================================================================== def main() -> None: """Run teleoperation with PICO XR Controller against Isaac Lab environment.""" # 1. Configuration parsing env_cfg = parse_env_cfg(args_cli.task, num_envs=args_cli.num_envs) env_cfg.env_name = args_cli.task if not isinstance(env_cfg, ManagerBasedRLEnvCfg): raise ValueError(f"Teleoperation requires ManagerBasedRLEnvCfg. Got: {type(env_cfg)}") env_cfg.terminations.time_out = None # 2. Environment creation try: env = gym.make(args_cli.task, cfg=env_cfg).unwrapped except Exception as e: logger.error(f"Failed to create environment '{args_cli.task}': {e}") simulation_app.close() return # 3. Teleoperation Interface Initialization is_abs_mode = "Abs" in args_cli.task print(f"\n[INFO] Connecting to PICO XR Headset using {args_cli.arm} controller...") print(f"[INFO] Using IK Mode: {'ABSOLUTE' if is_abs_mode else 'RELATIVE'}") teleop_interface = XrTeleopController( arm=args_cli.arm, pos_sensitivity=args_cli.sensitivity, rot_sensitivity=args_cli.sensitivity * (1.0 if is_abs_mode else 0.3), # Absolute rotation handles 1:1 better than relative is_absolute=is_abs_mode ) should_reset = False def request_reset(): nonlocal should_reset should_reset = True print("[INFO] Reset requested via XR button.") teleop_interface.add_callback("RESET", request_reset) def get_arm_action_term(): return env.action_manager._terms["arm_action"] def clear_ik_target_state(): """Clear the internal IK target so reset does not reuse the previous pose command.""" if not is_abs_mode: return arm_action_term = get_arm_action_term() ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose() arm_action_term._raw_actions.zero_() arm_action_term._processed_actions.zero_() arm_action_term._ik_controller._command.zero_() arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b def convert_action_world_to_root(action_tensor: torch.Tensor) -> torch.Tensor: """Convert an absolute IK action from world frame to robot root frame.""" robot = env.unwrapped.scene["robot"] root_pos_w = robot.data.root_pos_w[0].detach().cpu().numpy() root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy() # wxyz target_pos_w = action_tensor[:3].numpy() target_quat_w = action_tensor[3:7].numpy() pos_root, quat_root = world_pose_to_root_frame( target_pos_w, target_quat_w, root_pos_w, root_quat_w, ) out = action_tensor.clone() out[:3] = torch.tensor(pos_root, dtype=torch.float32) out[3:7] = torch.tensor(quat_root, dtype=torch.float32) return out def get_wheel_action() -> torch.Tensor: """Read left joystick and return 4-DOF wheel velocity command. Skid-steer differential drive. Joystick: Y-axis (+1 = forward), X-axis (+1 = right turn). Joint order from articulation (terminal log): [right_b, left_b, left_f, right_f] Right/left sign convention assumes both sides' joints have the same axis direction (positive velocity = forward). If the robot drives backward when pushing forward, negate base_speed in the launch command. """ try: joy = teleop_interface.xr_client.get_joystick("left") jy = float(joy[1]) # forward / backward jx = float(joy[0]) # right / left except Exception: return torch.zeros(4) v = jy * args_cli.base_speed omega = jx * args_cli.base_turn # Positive omega = turn right → left wheels faster, right wheels slower right_vel = v - omega left_vel = v + omega return torch.tensor( [right_vel, left_vel, left_vel, right_vel], dtype=torch.float32 ) env.reset() clear_ik_target_state() teleop_interface.reset() print("\n" + "=" * 50) print(" 🚀 Teleoperation Started!") print(" 🎮 Use the TRIGGER to open/close gripper.") print(" ✊ Hold GRIP and move the controller to move the arm.") print(" 🕹️ Left joystick: Y=forward/back, X=turn left/right.") print(" 🔄 Press B or Y to reset the environment.") print("=" * 50 + "\n") device = env.unwrapped.device sim_frame = 0 obs, _ = env.reset() clear_ik_target_state() while simulation_app.is_running(): try: with torch.inference_mode(): if should_reset: obs, _ = env.reset() clear_ik_target_state() teleop_interface.reset() should_reset = False sim_frame = 0 continue # Read current EEF in world frame from observations. policy_obs = obs["policy"] eef_pos = policy_obs["eef_pos"][0].cpu().numpy() eef_quat = policy_obs["eef_quat"][0].cpu().numpy() # Get action from XR Controller (world frame for absolute mode). action_np = teleop_interface.advance( current_eef_pos=eef_pos, current_eef_quat=eef_quat, ) # IK expects root-frame commands; convert just before sending. if is_abs_mode: action_np = convert_action_world_to_root(action_np) # Action manager order: arm_action | wheel_action | gripper_action # arm=7, wheel=4, gripper=1 → total 12 dims. wheel_np = get_wheel_action() action_np = torch.cat([action_np[:7], wheel_np, action_np[7:]]) actions = action_np.unsqueeze(0).repeat(env.num_envs, 1).to(device) # Step environment obs, _, _, _, _ = env.step(actions) # Print robot state every 30 frames sim_frame += 1 if sim_frame % 30 == 0: np.set_printoptions(precision=4, suppress=True, floatmode='fixed') policy_obs = obs["policy"] joint_pos = policy_obs["joint_pos"][0].cpu().numpy() eef_pos = policy_obs["eef_pos"][0].cpu().numpy() eef_quat = policy_obs["eef_quat"][0].cpu().numpy() last_act = policy_obs["actions"][0].cpu().numpy() # On first print, dump ALL joint names + positions to identify indices if sim_frame == 30: robot = env.unwrapped.scene["robot"] jnames = robot.joint_names print(f"\n{'='*70}") print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS (relative)") print(f"{'='*70}") for i, name in enumerate(jnames): print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}") print(f"{'='*70}") # Find arm joint indices dynamically by looking at the first 6-7 joints that aren't fingers or hands arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")] print(f" Deduced left arm indices: {arm_idx}") print(f"{'='*70}\n") # Get arm indices (cache-friendly: find once) if not hasattr(env, '_arm_idx_cache'): robot = env.unwrapped.scene["robot"] jnames = robot.joint_names env._arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("l_joint")] arm_idx = env._arm_idx_cache arm_joints = joint_pos[arm_idx] try: joy_dbg = teleop_interface.xr_client.get_joystick("left") joy_str = f"[{joy_dbg[0]:+.2f}, {joy_dbg[1]:+.2f}]" except Exception: joy_str = "N/A" print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------") print(f"| Left Arm Joints (rad): {arm_joints}") print(f"| EEF Pos (world): {eef_pos}") print(f"| EEF Quat (world, wxyz): {eef_quat}") print(f"| Last Action Sent: {last_act}") print(f"| Joystick (x=turn,y=fwd): {joy_str}") print(f"----------------------------------------------------------------") except Exception as e: logger.error(f"Error during simulation step: {e}") break teleop_interface.close() env.close() if __name__ == "__main__": main() simulation_app.close()