#!/usr/bin/env python3 # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). # All rights reserved. # # SPDX-License-Identifier: BSD-3-Clause """ Script to run teleoperation with Isaac Lab manipulation environments using PICO XR Controllers. Only absolute IK mode is supported (Isaac-MindRobot-LeftArm-IK-Abs-v0). The controller computes delta pose in Isaac Sim world frame and accumulates an absolute EEF target, which is then converted to robot root frame before being sent to the DifferentialIK action manager. """ import argparse import logging import sys import os from collections.abc import Callable # Ensure xr_utils (next to this script) is importable when running directly sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from isaaclab.app import AppLauncher logger = logging.getLogger(__name__) # add argparse arguments parser = argparse.ArgumentParser( description="Teleoperation for Isaac Lab environments with PICO XR Controller (absolute IK mode only)." ) parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.") parser.add_argument( "--task", type=str, default="Isaac-MindRobot-LeftArm-IK-Abs-v0", help="Name of the task (must be an Abs IK task).", ) parser.add_argument( "--sensitivity", type=float, default=None, help="Set both pos and rot sensitivity (overridden by --pos_sensitivity/--rot_sensitivity).", ) parser.add_argument("--pos_sensitivity", type=float, default=None, help="Position sensitivity. Default: 1.0.") parser.add_argument("--rot_sensitivity", type=float, default=None, help="Rotation sensitivity. Default: 0.3.") parser.add_argument("--arm", type=str, default="left", choices=["left", "right"], help="Which arm/controller to use.") parser.add_argument("--base_speed", type=float, default=5.0, help="Max wheel speed (rad/s) for joystick full forward.") parser.add_argument("--base_turn", type=float, default=2.0, help="Max wheel differential (rad/s) for joystick full left/right.") parser.add_argument("--drive_speed", type=float, default=0.5, help="Max robot linear speed (m/s) for direct base control. Default: 0.5") parser.add_argument("--drive_turn", type=float, default=1.5, help="Max robot yaw rate (rad/s) for direct base control. Default: 1.5") AppLauncher.add_app_launcher_args(parser) args_cli = parser.parse_args() app_launcher_args = vars(args_cli) app_launcher_args["xr"] = False app_launcher = AppLauncher(app_launcher_args) simulation_app = app_launcher.app """Rest everything follows.""" import gymnasium as gym import numpy as np import torch from scipy.spatial.transform import Rotation as R from isaaclab.envs import ManagerBasedRLEnvCfg import isaaclab_tasks # noqa: F401 import mindbot.tasks # noqa: F401 from isaaclab_tasks.utils import parse_env_cfg from xr_utils import XrClient, transform_xr_pose, quat_diff_as_rotvec_xyzw, is_valid_quaternion from xr_utils.geometry import R_HEADSET_TO_WORLD # ===================================================================== # Robot Camera Viewport Utilities # ===================================================================== # Resolved prim paths for env_0 — must match CameraCfg prim_path with # {ENV_REGEX_NS} → /World/envs/env_0 _ROBOT_CAM_PRIMS: dict[str, str] = { "Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand", "Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand", "Head": "/World/envs/env_0/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft", "Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest", } # Stores created viewport window objects _robot_viewports: dict[str, object] = {} def create_robot_viewports() -> None: """Create 4 viewport windows and bind each to a robot camera prim. Must be called after env.reset() so all prims exist on the USD stage. """ try: import omni.kit.viewport.utility as vp_util except ImportError: logger.warning("[Viewport] omni.kit.viewport.utility not available — skipping viewport creation.") return for name, cam_path in _ROBOT_CAM_PRIMS.items(): vp_win = vp_util.create_viewport_window( f"Robot {name} View", width=640, height=360 ) vp_win.viewport_api.camera_path = cam_path _robot_viewports[name] = vp_win print(f"[INFO] Viewport 'Robot {name} View' bound to: {cam_path}") def rebind_robot_viewports() -> None: """Re-bind all robot viewports to their camera paths. Call this after every env reset so viewports stay locked to the cameras. """ for name, vp_win in _robot_viewports.items(): cam_path = _ROBOT_CAM_PRIMS[name] vp_win.viewport_api.camera_path = cam_path # ===================================================================== # Helpers # ===================================================================== def _quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R: """Convert Isaac-style wxyz quaternion to scipy Rotation.""" return R.from_quat([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]]) def _rotation_to_quat_wxyz(rot: R) -> np.ndarray: """Convert scipy Rotation to Isaac-style wxyz quaternion.""" q = rot.as_quat() return np.array([q[3], q[0], q[1], q[2]]) def world_pose_to_root_frame( pos_w: np.ndarray, quat_wxyz: np.ndarray, root_pos_w: np.ndarray, root_quat_wxyz: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: """Express a world-frame pose in the robot root frame.""" root_rot = _quat_wxyz_to_rotation(root_quat_wxyz) pose_rot = _quat_wxyz_to_rotation(quat_wxyz) pos_root = root_rot.inv().apply(pos_w - root_pos_w) quat_root = _rotation_to_quat_wxyz(root_rot.inv() * pose_rot) return pos_root, quat_root # ===================================================================== # Teleoperation Controller # ===================================================================== class XrTeleopController: """Teleop controller for PICO XR headset (absolute IK mode). Accumulates an absolute EEF target in Isaac Sim world frame. Grip button acts as clutch; B/Y buttons trigger environment reset. """ def __init__(self, arm="left", pos_sensitivity=1.0, rot_sensitivity=0.3, max_pos_per_step=0.05, max_rot_per_step=0.08, xr_client=None): self.xr_client = xr_client if xr_client is not None else XrClient() self._owns_client = xr_client is None # only close if we created it self.pos_sensitivity = pos_sensitivity self.rot_sensitivity = rot_sensitivity # Hard caps per physics step to prevent IK divergence. # max_rot_per_step ~4.6°, max_pos_per_step 5 cm. self.max_pos_per_step = max_pos_per_step self.max_rot_per_step = max_rot_per_step self.arm = arm self.controller_name = "left_controller" if arm == "left" else "right_controller" self.grip_name = "left_grip" if arm == "left" else "right_grip" self.trigger_name = "left_trigger" if arm == "left" else "right_trigger" self.R_headset_world = R_HEADSET_TO_WORLD self.prev_xr_pos = None self.prev_xr_quat = None self.target_eef_pos = None # world frame self.target_eef_quat = None # world frame, wxyz self.grip_active = False self.frame_count = 0 self.reset_button_latched = False self.require_grip_reengage = False self.grip_engage_threshold = 0.8 self.grip_release_threshold = 0.2 self.callbacks = {} def add_callback(self, name: str, func: Callable): self.callbacks[name] = func def reset(self): self.prev_xr_pos = None self.prev_xr_quat = None self.grip_active = False self.frame_count = 0 self.target_eef_pos = None self.target_eef_quat = None # Require grip release after reset to avoid driving to stale pose. self.require_grip_reengage = True def close(self): if self._owns_client: self.xr_client.close() def get_zero_action(self, trigger, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor: """Return 8D hold action [x, y, z, qw, qx, qy, qz, gripper] at frozen target pose.""" action = torch.zeros(8) if self.target_eef_pos is not None and self.target_eef_quat is not None: # Prefer frozen world-frame target so IK holds a fixed world position # even if the robot chassis drifts slightly under gravity. action[:3] = torch.tensor(self.target_eef_pos.copy()) action[3:7] = torch.tensor(self.target_eef_quat.copy()) elif current_eef_pos is not None and current_eef_quat is not None: action[:3] = torch.tensor(current_eef_pos.copy()) action[3:7] = torch.tensor(current_eef_quat.copy()) else: action[3] = 1.0 # identity quaternion action[7] = 1.0 if trigger > 0.5 else -1.0 return action def advance(self, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor: """Read XR controller and return 8D absolute action [x, y, z, qw, qx, qy, qz, gripper]. Position and quaternion are in Isaac Sim world frame. Caller must convert to robot root frame before sending to IK. """ try: reset_pressed = self.xr_client.get_button("B") or self.xr_client.get_button("Y") if reset_pressed and not self.reset_button_latched: if "RESET" in self.callbacks: self.callbacks["RESET"]() self.reset_button_latched = reset_pressed except Exception: pass try: raw_pose = self.xr_client.get_pose(self.controller_name) grip = self.xr_client.get_key_value(self.grip_name) trigger = self.xr_client.get_key_value(self.trigger_name) except Exception: return self.get_zero_action(0.0, current_eef_pos, current_eef_quat) if not is_valid_quaternion(raw_pose[3:]): return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) # Wait for grip release after reset if self.require_grip_reengage: if grip <= self.grip_release_threshold: self.require_grip_reengage = False else: if current_eef_pos is not None and current_eef_quat is not None: self.target_eef_pos = current_eef_pos.copy() self.target_eef_quat = current_eef_quat.copy() return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) # Grip button as clutch with hysteresis grip_pressed = grip > self.grip_release_threshold if self.grip_active else grip >= self.grip_engage_threshold if not grip_pressed: self.prev_xr_pos = None self.prev_xr_quat = None # Only snapshot target on the transition frame (grip just released) or first ever frame. # After that, keep it frozen so IK holds a fixed world-frame position. if self.grip_active or self.target_eef_pos is None: if current_eef_pos is not None and current_eef_quat is not None: self.target_eef_pos = current_eef_pos.copy() self.target_eef_quat = current_eef_quat.copy() self.grip_active = False return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) if not self.grip_active: self.prev_xr_pos = raw_pose[:3].copy() self.prev_xr_quat = raw_pose[3:].copy() if current_eef_pos is not None and current_eef_quat is not None: self.target_eef_pos = current_eef_pos.copy() self.target_eef_quat = current_eef_quat.copy() self.grip_active = True return self.get_zero_action(trigger, current_eef_pos, current_eef_quat) # Project current and previous XR poses into Isaac Sim world frame xr_world_pos, xr_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:]) prev_xr_world_pos, prev_xr_world_quat_xyzw = transform_xr_pose(self.prev_xr_pos, self.prev_xr_quat) # Delta position (world frame), clamped per step world_delta_pos = (xr_world_pos - prev_xr_world_pos) * self.pos_sensitivity pos_norm = np.linalg.norm(world_delta_pos) if pos_norm > self.max_pos_per_step: world_delta_pos *= self.max_pos_per_step / pos_norm # Delta rotation (world frame), clamped per step world_delta_rot = quat_diff_as_rotvec_xyzw(prev_xr_world_quat_xyzw, xr_world_quat_xyzw) * self.rot_sensitivity rot_norm = np.linalg.norm(world_delta_rot) if rot_norm > self.max_rot_per_step: world_delta_rot *= self.max_rot_per_step / rot_norm gripper_action = 1.0 if trigger > 0.5 else -1.0 # Accumulate absolute target if self.target_eef_pos is None: self.target_eef_pos = np.zeros(3) self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0]) self.target_eef_pos += world_delta_pos # Apply rotation delta in world frame so controller direction = EEF world direction target_r = _quat_wxyz_to_rotation(self.target_eef_quat) self.target_eef_quat = _rotation_to_quat_wxyz(R.from_rotvec(world_delta_rot) * target_r) self.prev_xr_pos = raw_pose[:3].copy() self.prev_xr_quat = raw_pose[3:].copy() self.frame_count += 1 action = torch.tensor([ *self.target_eef_pos, *self.target_eef_quat, gripper_action, ], dtype=torch.float32) if self.frame_count % 30 == 0: np.set_printoptions(precision=4, suppress=True, floatmode='fixed') print("\n====================== [VR TELEOP DEBUG] ======================") print(f"| Raw VR Pos (OpenXR): {np.array(raw_pose[:3])}") print(f"| Raw VR Quat (xyzw): {np.array(raw_pose[3:])}") print(f"| XR Delta Pos (world): {world_delta_pos} (norm={np.linalg.norm(world_delta_pos):.4f})") print(f"| XR Delta Rot (world): {world_delta_rot} (norm={np.linalg.norm(world_delta_rot):.4f})") print(f"| Targ Pos (world): {action[:3].numpy()}") print(f"| Targ Quat (world, wxyz): {action[3:7].numpy()}") print(f"| Gripper & Trigger: Grip={grip:.2f}, Trig={trigger:.2f}") print("===============================================================") return action # ===================================================================== # Head Tracking Utility # ===================================================================== def get_head_joint_targets( xr_client, neutral_rot: R | None ) -> tuple[np.ndarray, R | None]: """Extract head joint targets [yaw, pitch] from HMD pose. At first call (neutral_rot is None) the current headset orientation is recorded as the neutral reference and zeros are returned. Subsequent calls return yaw/pitch relative to that neutral pose. Returns: (joint_targets, neutral_rot) where joint_targets is shape (2,) float32 [head_yaw_Joint_rad, head_pitch_Joint_rad]. """ try: raw_pose = xr_client.get_pose("headset") if not is_valid_quaternion(raw_pose[3:]): return np.zeros(2, dtype=np.float32), neutral_rot _, head_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:]) head_rot = R.from_quat(head_world_quat_xyzw) if neutral_rot is None: # First call — record neutral and hold zero return np.zeros(2, dtype=np.float32), head_rot # Relative rotation from neutral heading rel_rot = head_rot * neutral_rot.inv() # ZYX Euler: [0]=yaw (Z), [1]=pitch (Y) euler_zyx = rel_rot.as_euler("ZYX") yaw = float(np.clip(euler_zyx[0], -1.57, 1.57)) # ±90° pitch = float(np.clip(euler_zyx[1], -0.52, 0.52)) # ±30° return np.array([yaw, pitch], dtype=np.float32), neutral_rot except Exception: return np.zeros(2, dtype=np.float32), neutral_rot # ===================================================================== # Main Execution Loop # ===================================================================== def main() -> None: """Run teleoperation with PICO XR Controller against Isaac Lab environment.""" env_cfg = parse_env_cfg(args_cli.task, num_envs=args_cli.num_envs) env_cfg.env_name = args_cli.task if not isinstance(env_cfg, ManagerBasedRLEnvCfg): raise ValueError(f"Teleoperation requires ManagerBasedRLEnvCfg. Got: {type(env_cfg)}") if "Abs" not in args_cli.task: raise ValueError( f"Task '{args_cli.task}' is not an absolute IK task. " "Only Abs tasks are supported (e.g. Isaac-MindRobot-LeftArm-IK-Abs-v0)." ) env_cfg.terminations.time_out = None try: env = gym.make(args_cli.task, cfg=env_cfg).unwrapped except Exception as e: logger.error(f"Failed to create environment '{args_cli.task}': {e}") simulation_app.close() return # Detect single-arm vs dual-arm based on action manager term names is_dual_arm = "left_arm_action" in env.action_manager._terms # Sensitivity: explicit args override --sensitivity, which overrides defaults pos_sens = 1.0 rot_sens = 0.3 if args_cli.sensitivity is not None: pos_sens = args_cli.sensitivity rot_sens = args_cli.sensitivity if args_cli.pos_sensitivity is not None: pos_sens = args_cli.pos_sensitivity if args_cli.rot_sensitivity is not None: rot_sens = args_cli.rot_sensitivity if is_dual_arm: print(f"\n[INFO] Dual-arm mode detected. Using both controllers.") print(f"[INFO] IK Mode: ABSOLUTE") print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}") shared_client = XrClient() teleop_left = XrTeleopController(arm="left", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client) teleop_right = XrTeleopController(arm="right", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client) teleop_interface = teleop_left # primary interface for callbacks/reset teleop_right_ref = teleop_right else: print(f"\n[INFO] Connecting to PICO XR Headset using {args_cli.arm} controller...") print(f"[INFO] IK Mode: ABSOLUTE") print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}") teleop_interface = XrTeleopController( arm=args_cli.arm, pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, ) should_reset = False def request_reset(): nonlocal should_reset should_reset = True print("[INFO] Reset requested via XR button.") teleop_interface.add_callback("RESET", request_reset) def clear_ik_target_state(): """Sync IK controller's internal target to current EEF pose to avoid jumps after reset.""" if is_dual_arm: for term_key in ["left_arm_action", "right_arm_action"]: arm_action_term = env.action_manager._terms[term_key] ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose() arm_action_term._raw_actions.zero_() arm_action_term._processed_actions.zero_() arm_action_term._ik_controller._command.zero_() arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b else: arm_action_term = env.action_manager._terms["arm_action"] ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose() arm_action_term._raw_actions.zero_() arm_action_term._processed_actions.zero_() arm_action_term._ik_controller._command.zero_() arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b def convert_action_world_to_root(action_tensor: torch.Tensor) -> torch.Tensor: """Convert absolute IK action from world frame to robot root frame.""" robot = env.unwrapped.scene["robot"] root_pos_w = robot.data.root_pos_w[0].detach().cpu().numpy() root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy() pos_root, quat_root = world_pose_to_root_frame( action_tensor[:3].numpy(), action_tensor[3:7].numpy(), root_pos_w, root_quat_w, ) out = action_tensor.clone() out[:3] = torch.tensor(pos_root, dtype=torch.float32) out[3:7] = torch.tensor(quat_root, dtype=torch.float32) return out def get_chassis_commands() -> tuple[torch.Tensor, float, float]: """Read left joystick → (wheel_cmd 4D, v_fwd m/s, omega rad/s). wheel_cmd : differential wheel velocities for the action vector (training data). v_fwd : desired robot forward speed in m/s (for direct base control). omega : desired robot yaw rate in rad/s (for direct base control). """ try: joy = teleop_interface.xr_client.get_joystick("left") jy = float(joy[1]) jx = float(joy[0]) except Exception: return torch.zeros(4), 0.0, 0.0 # Wheel velocity commands for action vector (rad/s) v_w = jy * args_cli.base_speed omega_w = jx * args_cli.base_turn right_vel = v_w - omega_w left_vel = v_w + omega_w wheel_cmd = torch.tensor([right_vel, left_vel, left_vel, right_vel], dtype=torch.float32) if abs(v_w) > 0.1 or abs(omega_w) > 0.1: print(f"[WHEEL] joy=({jx:+.2f},{jy:+.2f}) v_wheel={v_w:.2f} ω_wheel={omega_w:.2f} → {wheel_cmd.tolist()}") # Direct base velocity commands (m/s, rad/s) v_fwd = jy * args_cli.drive_speed omega = -jx * args_cli.drive_turn # jx<0 (left push) → positive yaw = left turn return wheel_cmd, v_fwd, omega def apply_base_velocity(v_fwd: float, omega: float) -> None: """Directly set robot root linear+angular velocity to bypass isotropic friction. PhysX uses isotropic friction, so skid-steer lateral slip is impossible through wheel torques alone. This function overrides the base velocity each step to give clean translational + rotational motion regardless of friction. The write is buffered and applied at the start of the next env.step(). """ if abs(v_fwd) < 1e-4 and abs(omega) < 1e-4: return robot = env.unwrapped.scene["robot"] rq = robot.data.root_quat_w # [N, 4] wxyz w_q, x_q, y_q, z_q = rq[:, 0], rq[:, 1], rq[:, 2], rq[:, 3] # Robot forward direction in world frame (rotate [1,0,0] by root quaternion) fwd_x = 1.0 - 2.0 * (y_q * y_q + z_q * z_q) fwd_y = 2.0 * (x_q * y_q + w_q * z_q) vel = torch.zeros(env.num_envs, 6, device=device) vel[:, 0] = v_fwd * fwd_x # world x linear vel[:, 1] = v_fwd * fwd_y # world y linear # vel[:, 2] = 0 # z: let physics handle gravity/contact vel[:, 5] = omega # world z angular (yaw) robot.write_root_velocity_to_sim(vel) obs, _ = env.reset() clear_ik_target_state() teleop_interface.reset() # Create 4 viewport windows bound to the robot cameras create_robot_viewports() if is_dual_arm: teleop_right_ref.reset() print("\n" + "=" * 50) print(" Teleoperation Started!") if is_dual_arm: print(" LEFT controller → left arm") print(" RIGHT controller → right arm") print(" TRIGGER: open/close gripper") print(" GRIP (hold): move the arm") print(" Left joystick: Y=forward/back, X=turn") print(" B / Y: reset environment") print("=" * 50 + "\n") device = env.unwrapped.device sim_frame = 0 obs, _ = env.reset() clear_ik_target_state() last_root_left = None last_root_right = None neutral_head_rot = None # calibrated on first headset sample after reset while simulation_app.is_running(): try: with torch.inference_mode(): if should_reset: obs, _ = env.reset() clear_ik_target_state() teleop_interface.reset() if is_dual_arm: teleop_right_ref.reset() rebind_robot_viewports() should_reset = False sim_frame = 0 last_root_left = None last_root_right = None neutral_head_rot = None # re-calibrate on next headset sample continue policy_obs = obs["policy"] wheel_np, v_fwd, omega_base = get_chassis_commands() # Head tracking: HMD yaw/pitch relative to neutral pose head_cmd_np, neutral_head_rot = get_head_joint_targets( teleop_interface.xr_client, neutral_head_rot ) head_cmd = torch.tensor(head_cmd_np, dtype=torch.float32) if is_dual_arm: eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy() eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy() eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy() eef_quat_right = policy_obs["eef_quat_right"][0].cpu().numpy() left_action = teleop_left.advance(current_eef_pos=eef_pos_left, current_eef_quat=eef_quat_left) right_action = teleop_right_ref.advance(current_eef_pos=eef_pos_right, current_eef_quat=eef_quat_right) # Only recompute root-frame IK target when grip is active; otherwise freeze joints if teleop_left.grip_active or last_root_left is None: last_root_left = convert_action_world_to_root(left_action)[:7].clone() if teleop_right_ref.grip_active or last_root_right is None: last_root_right = convert_action_world_to_root(right_action)[:7].clone() # Action layout: left_arm(7) | wheel(4) | left_gripper(1) | right_arm(7) | right_gripper(1) | head(2) action_np = torch.cat([ last_root_left, wheel_np, left_action[7:8], last_root_right, right_action[7:8], head_cmd, ]) else: eef_pos = policy_obs["eef_pos"][0].cpu().numpy() eef_quat = policy_obs["eef_quat"][0].cpu().numpy() action_np = teleop_interface.advance(current_eef_pos=eef_pos, current_eef_quat=eef_quat) if teleop_interface.grip_active or last_root_left is None: last_root_left = convert_action_world_to_root(action_np)[:7].clone() # Action layout: arm(7) | wheel(4) | gripper(1) action_np = torch.cat([last_root_left, wheel_np, action_np[7:8]]) actions = action_np.unsqueeze(0).repeat(env.num_envs, 1).to(device) obs, _, _, _, _ = env.step(actions) # Direct base velocity override: bypasses isotropic friction limitation # so skid-steer turning works correctly. apply_base_velocity(v_fwd, omega_base) sim_frame += 1 if sim_frame % 30 == 0: np.set_printoptions(precision=4, suppress=True, floatmode='fixed') policy_obs = obs["policy"] joint_pos = policy_obs["joint_pos"][0].cpu().numpy() last_act = policy_obs["actions"][0].cpu().numpy() if sim_frame == 30: robot = env.unwrapped.scene["robot"] jnames = robot.joint_names print(f"\n{'='*70}") print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS") print(f"{'='*70}") for i, name in enumerate(jnames): print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}") arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")] print(f" Deduced left arm indices: {arm_idx}") print(f"{'='*70}\n") if not hasattr(env, '_arm_idx_cache'): robot = env.unwrapped.scene["robot"] jnames = robot.joint_names env._arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("l_joint")] env._right_arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("r_joint")] arm_joints = joint_pos[env._arm_idx_cache] right_arm_joints = joint_pos[env._right_arm_idx_cache] try: joy_dbg = teleop_interface.xr_client.get_joystick("left") joy_str = f"[{joy_dbg[0]:+.2f}, {joy_dbg[1]:+.2f}]" except Exception: joy_str = "N/A" if is_dual_arm: eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy() eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy() eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy() print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------") print(f"| Left Arm Joints (rad): {arm_joints}") print(f"| Right Arm Joints (rad): {right_arm_joints}") print(f"| Left EEF Pos (world, m): {eef_pos_left}") print(f"| Right EEF Pos (world, m): {eef_pos_right}") print(f"| Cmd left_arm(abs): {last_act[:7]}") print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}") print(f"| Cmd left_gripper: {last_act[11:12]} (+1=open -1=close)") print(f"| Cmd right_arm(abs): {last_act[12:19]}") print(f"| Cmd right_gripper: {last_act[19:]} (+1=open -1=close)") print(f"| Joystick (x=turn,y=fwd): {joy_str}") print(f"----------------------------------------------------------------") else: eef_pos = policy_obs["eef_pos"][0].cpu().numpy() eef_quat = policy_obs["eef_quat"][0].cpu().numpy() if not hasattr(env, '_prev_eef_quat'): env._prev_eef_quat = eef_quat.copy() prev_q = env._prev_eef_quat r_prev = R.from_quat([prev_q[1], prev_q[2], prev_q[3], prev_q[0]]) r_curr = R.from_quat([eef_quat[1], eef_quat[2], eef_quat[3], eef_quat[0]]) delta_rotvec = (r_curr * r_prev.inv()).as_rotvec() delta_angle_deg = np.degrees(np.linalg.norm(delta_rotvec)) delta_axis = delta_rotvec / (np.linalg.norm(delta_rotvec) + 1e-9) env._prev_eef_quat = eef_quat.copy() approach_vec = r_curr.apply(np.array([1.0, 0.0, 0.0])) print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------") print(f"| Left Arm Joints (rad): {arm_joints}") print(f"| EEF Pos (world, m): {eef_pos}") print(f"| EEF approach vec (world): [{approach_vec[0]:+.3f}, {approach_vec[1]:+.3f}, {approach_vec[2]:+.3f}] (夹爪朝向)") print(f"| EEF rot since last print: {delta_angle_deg:+.2f}° around [{delta_axis[0]:+.3f},{delta_axis[1]:+.3f},{delta_axis[2]:+.3f}] (world)") print(f"| Cmd arm(abs pos+quat): {last_act[:7]}") print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}") print(f"| Cmd gripper: {last_act[11:]} (+1=open -1=close)") print(f"| Joystick (x=turn,y=fwd): {joy_str}") print(f"----------------------------------------------------------------") except Exception as e: logger.error(f"Error during simulation step: {e}") break teleop_interface.close() if is_dual_arm: teleop_right_ref.close() shared_client.close() env.close() if __name__ == "__main__": main() simulation_app.close()