dual arm XR teleoperation: env cfg, shared client, joint locking

- Add MindRobotDualArmIKAbsEnvCfg: standalone dual-arm env inheriting
  ManagerBasedRLEnvCfg directly (no single-arm dependency), 20D action
  space (left_arm7 | wheel4 | left_gripper1 | right_arm7 | right_gripper1)
- Add local mdp/ with parameterized gripper_pos(joint_names) to support
  independent left/right gripper observations without modifying IsaacLab
- Update teleop_xr_agent.py for dual-arm mode: shared XrClient to avoid
  SDK double-init crash, root-frame IK command caching so arm joints are
  locked when grip not pressed (EEF world pos still tracks chassis)
- Tune mindrobot_cfg.py initial poses with singularity-avoiding offsets
- Add CLAUDE.md project instructions and debug_action_assembly.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-14 13:31:26 +08:00
parent 0c557938a7
commit 7be48b3964
10 changed files with 986 additions and 268 deletions

View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
验证 teleop_xr_agent.py 中相对模式下动作拼接顺序的正确性。
无需 Isaac Sim / XR 设备,纯 CPU 运行。
运行方式:
python scripts/debug_action_assembly.py
"""
import torch
# ============================================================
# 模拟 Action Manager 的切分逻辑
# ============================================================
# 来自 mindrobot_left_arm_ik_env_cfg.py 的 Action 配置:
# arm_action : DifferentialInverseKinematicsActionCfg相对模式 6D绝对模式 7D
# wheel_action: JointVelocityActionCfg4 个轮子4D
# gripper_action: BinaryJointPositionActionCfg1D
def action_manager_split(action: torch.Tensor, arm_size: int):
"""模拟 Isaac Lab ManagerBasedRLEnv 的动作切分。"""
assert action.shape[-1] == arm_size + 4 + 1, \
f"期望总维度 {arm_size + 4 + 1},实际 {action.shape[-1]}"
arm = action[..., :arm_size]
wheel = action[..., arm_size:arm_size + 4]
gripper = action[..., arm_size + 4:]
return arm, wheel, gripper
# ============================================================
# 复现 teleop_xr_agent.py:474 的拼接逻辑
# ============================================================
def current_assembly(action_np: torch.Tensor, wheel_np: torch.Tensor) -> torch.Tensor:
"""现有代码的拼接逻辑(逐字复制自 teleop_xr_agent.py:474"""
return torch.cat([action_np[:7], wheel_np, action_np[7:]])
def fixed_assembly(action_np: torch.Tensor, wheel_np: torch.Tensor, is_abs_mode: bool) -> torch.Tensor:
"""修正后的拼接逻辑。"""
arm_size = 7 if is_abs_mode else 6
arm_action = action_np[:arm_size]
gripper_val = action_np[arm_size:] # 最后一个元素
return torch.cat([arm_action, wheel_np, gripper_val])
# ============================================================
# 测试
# ============================================================
def run_test(mode: str):
is_abs = (mode == "absolute")
arm_size = 7 if is_abs else 6
# 构造可辨识的测试值
if is_abs:
# 绝对模式:[x,y,z,qw,qx,qy,qz, gripper]
fake_action = torch.tensor([0.1, 0.2, 0.3, # pos
1.0, 0.0, 0.0, 0.0, # quat
0.9]) # gripper (trigger=0.9)
else:
# 相对模式:[dx,dy,dz, drx,dry,drz, gripper]
fake_action = torch.tensor([0.01, 0.02, 0.03, # delta pos
0.05, 0.06, 0.07, # delta rot
0.9]) # gripper (trigger=0.9)
fake_wheel = torch.tensor([1.5, 1.5, 1.5, 1.5]) # 向前行驶
print(f"\n{'='*60}")
print(f" 测试模式: {mode.upper()}")
print(f"{'='*60}")
print(f" 原始 action_np : {fake_action.tolist()}")
print(f" 原始 wheel_np : {fake_wheel.tolist()}")
print(f" 期望 gripper 值 : {fake_action[-1].item()} (应为 0.9)")
print(f" 期望 wheel 值 : {fake_wheel.tolist()} (应为 [1.5,1.5,1.5,1.5])")
# ---- 现有代码 ----
assembled_current = current_assembly(fake_action, fake_wheel)
try:
arm_c, wheel_c, grip_c = action_manager_split(assembled_current, arm_size)
print(f"\n [现有代码]")
print(f" 拼接结果({assembled_current.shape[0]}D) : {assembled_current.tolist()}")
print(f" → arm ({arm_c.shape[0]}D) : {arm_c.tolist()}")
print(f" → wheel ({wheel_c.shape[0]}D) : {wheel_c.tolist()}")
print(f" → gripper ({grip_c.shape[0]}D) : {grip_c.tolist()}")
arm_ok = True
wheel_ok = torch.allclose(wheel_c, fake_wheel)
grip_ok = torch.allclose(grip_c, fake_action[-1:])
print(f" arm 正确? {'' if arm_ok else ''} | wheel 正确? {'' if wheel_ok else '❌ ← BUG'} | gripper 正确? {'' if grip_ok else '❌ ← BUG'}")
except AssertionError as e:
print(f" [现有代码] 维度断言失败: {e}")
# ---- 修正后代码 ----
assembled_fixed = fixed_assembly(fake_action, fake_wheel, is_abs)
try:
arm_f, wheel_f, grip_f = action_manager_split(assembled_fixed, arm_size)
print(f"\n [修正后代码]")
print(f" 拼接结果({assembled_fixed.shape[0]}D) : {assembled_fixed.tolist()}")
print(f" → arm ({arm_f.shape[0]}D) : {arm_f.tolist()}")
print(f" → wheel ({wheel_f.shape[0]}D) : {wheel_f.tolist()}")
print(f" → gripper ({grip_f.shape[0]}D) : {grip_f.tolist()}")
arm_ok = True
wheel_ok = torch.allclose(wheel_f, fake_wheel)
grip_ok = torch.allclose(grip_f, fake_action[-1:])
print(f" arm 正确? {'' if arm_ok else ''} | wheel 正确? {'' if wheel_ok else ''} | gripper 正确? {'' if grip_ok else ''}")
except AssertionError as e:
print(f" [修正后代码] 维度断言失败: {e}")
if __name__ == "__main__":
run_test("relative")
run_test("absolute")
print()

View File

@@ -6,7 +6,11 @@
"""
Script to run teleoperation with Isaac Lab manipulation environments using PICO XR Controllers.
This script uses XRoboToolkit to fetch XR controller poses and maps them to differential IK actions.
Only absolute IK mode is supported (Isaac-MindRobot-LeftArm-IK-Abs-v0).
The controller computes delta pose in Isaac Sim world frame and accumulates
an absolute EEF target, which is then converted to robot root frame before
being sent to the DifferentialIK action manager.
"""
import argparse
@@ -24,44 +28,30 @@ logger = logging.getLogger(__name__)
# add argparse arguments
parser = argparse.ArgumentParser(
description="Teleoperation for Isaac Lab environments with PICO XR Controller."
)
parser.add_argument(
"--num_envs", type=int, default=1, help="Number of environments to simulate."
description="Teleoperation for Isaac Lab environments with PICO XR Controller (absolute IK mode only)."
)
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
parser.add_argument(
"--task",
type=str,
default="Isaac-MindRobot-LeftArm-IK-Rel-v0",
help="Name of the task.",
default="Isaac-MindRobot-LeftArm-IK-Abs-v0",
help="Name of the task (must be an Abs IK task).",
)
parser.add_argument(
"--sensitivity", type=float, default=5.0, help="Sensitivity factor for pos/rot."
"--sensitivity", type=float, default=None,
help="Set both pos and rot sensitivity (overridden by --pos_sensitivity/--rot_sensitivity).",
)
parser.add_argument(
"--arm",
type=str,
default="left",
choices=["left", "right"],
help="Which arm/controller to use.",
)
parser.add_argument(
"--base_speed", type=float, default=3.0,
help="Max wheel speed (rad/s) for joystick full forward.",
)
parser.add_argument(
"--base_turn", type=float, default=2.0,
help="Max wheel differential (rad/s) for joystick full left/right.",
)
# append AppLauncher cli args
parser.add_argument("--pos_sensitivity", type=float, default=None, help="Position sensitivity. Default: 1.0.")
parser.add_argument("--rot_sensitivity", type=float, default=None, help="Rotation sensitivity. Default: 0.3.")
parser.add_argument("--arm", type=str, default="left", choices=["left", "right"], help="Which arm/controller to use.")
parser.add_argument("--base_speed", type=float, default=3.0, help="Max wheel speed (rad/s) for joystick full forward.")
parser.add_argument("--base_turn", type=float, default=2.0, help="Max wheel differential (rad/s) for joystick full left/right.")
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
app_launcher_args = vars(args_cli)
# Disable some rendering settings to speed up
app_launcher_args["xr"] = False
# launch omniverse app
app_launcher = AppLauncher(app_launcher_args)
simulation_app = app_launcher.app
@@ -83,7 +73,7 @@ from xr_utils.geometry import R_HEADSET_TO_WORLD
# =====================================================================
# Teleoperation Interface for XR
# Helpers
# =====================================================================
def _quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R:
@@ -92,9 +82,9 @@ def _quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R:
def _rotation_to_quat_wxyz(rot: R) -> np.ndarray:
"""Convert scipy Rotation quaternion to Isaac-style wxyz."""
quat_xyzw = rot.as_quat()
return np.array([quat_xyzw[3], quat_xyzw[0], quat_xyzw[1], quat_xyzw[2]])
"""Convert scipy Rotation to Isaac-style wxyz quaternion."""
q = rot.as_quat()
return np.array([q[3], q[0], q[1], q[2]])
def world_pose_to_root_frame(
@@ -111,30 +101,39 @@ def world_pose_to_root_frame(
return pos_root, quat_root
class XrTeleopController:
"""Teleop controller for PICO XR headset."""
# =====================================================================
# Teleoperation Controller
# =====================================================================
def __init__(self, arm="left", pos_sensitivity=1.0, rot_sensitivity=0.3, is_absolute=False):
self.xr_client = XrClient()
class XrTeleopController:
"""Teleop controller for PICO XR headset (absolute IK mode).
Accumulates an absolute EEF target in Isaac Sim world frame.
Grip button acts as clutch; B/Y buttons trigger environment reset.
"""
def __init__(self, arm="left", pos_sensitivity=1.0, rot_sensitivity=0.3,
max_pos_per_step=0.05, max_rot_per_step=0.08, xr_client=None):
self.xr_client = xr_client if xr_client is not None else XrClient()
self._owns_client = xr_client is None # only close if we created it
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
# Hard caps per physics step to prevent IK divergence.
# max_rot_per_step ~4.6°, max_pos_per_step 5 cm.
self.max_pos_per_step = max_pos_per_step
self.max_rot_per_step = max_rot_per_step
self.arm = arm
self.is_absolute = is_absolute
self.controller_name = "left_controller" if arm == "left" else "right_controller"
self.grip_name = "left_grip" if arm == "left" else "right_grip"
self.trigger_name = "left_trigger" if arm == "left" else "right_trigger"
# Coordinate transform matrix
self.R_headset_world = R_HEADSET_TO_WORLD
# Raw XR tracking space poses
self.prev_xr_pos = None
self.prev_xr_quat = None
# Absolute target states
self.target_eef_pos = None
self.target_eef_quat = None # wxyz
self.target_eef_pos = None # world frame
self.target_eef_quat = None # world frame, wxyz
self.grip_active = False
self.frame_count = 0
@@ -142,8 +141,6 @@ class XrTeleopController:
self.require_grip_reengage = False
self.grip_engage_threshold = 0.8
self.grip_release_threshold = 0.2
# Callbacks (like reset, etc)
self.callbacks = {}
def add_callback(self, name: str, func: Callable):
@@ -156,40 +153,35 @@ class XrTeleopController:
self.frame_count = 0
self.target_eef_pos = None
self.target_eef_quat = None
# Require one grip release after reset so stale controller motion
# cannot immediately drive the robot back toward the previous pose.
# Require grip release after reset to avoid driving to stale pose.
self.require_grip_reengage = True
def close(self):
self.xr_client.close()
if self._owns_client:
self.xr_client.close()
def get_zero_action(self, trigger, current_eef_pos=None, current_eef_quat=None):
if self.is_absolute:
action = torch.zeros(8)
# Stay at the current valid pose, or fallback to the cached target
if current_eef_pos is not None and current_eef_quat is not None:
action[:3] = torch.tensor(current_eef_pos.copy())
action[3:7] = torch.tensor(current_eef_quat.copy())
elif self.target_eef_pos is not None and self.target_eef_quat is not None:
action[:3] = torch.tensor(self.target_eef_pos.copy())
action[3:7] = torch.tensor(self.target_eef_quat.copy())
else:
action[3] = 1.0 # default w=1 for quat
action[7] = 1.0 if trigger > 0.5 else -1.0
def get_zero_action(self, trigger, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Return 8D hold action [x, y, z, qw, qx, qy, qz, gripper] at frozen target pose."""
action = torch.zeros(8)
if self.target_eef_pos is not None and self.target_eef_quat is not None:
# Prefer frozen world-frame target so IK holds a fixed world position
# even if the robot chassis drifts slightly under gravity.
action[:3] = torch.tensor(self.target_eef_pos.copy())
action[3:7] = torch.tensor(self.target_eef_quat.copy())
elif current_eef_pos is not None and current_eef_quat is not None:
action[:3] = torch.tensor(current_eef_pos.copy())
action[3:7] = torch.tensor(current_eef_quat.copy())
else:
action = torch.zeros(7)
action[6] = 1.0 if trigger > 0.5 else -1.0
action[3] = 1.0 # identity quaternion
action[7] = 1.0 if trigger > 0.5 else -1.0
return action
def advance(self, current_eef_pos=None, current_eef_quat=None) -> torch.Tensor:
"""Read XR controller and return 8D absolute action [x, y, z, qw, qx, qy, qz, gripper].
Position and quaternion are in Isaac Sim world frame.
Caller must convert to robot root frame before sending to IK.
"""
Reads the XR controller.
Relative bounds return 7D action tensor: [dx, dy, dz, drx, dry, drz, gripper]
Absolute bounds return 8D action tensor: [x, y, z, qw, qx, qy, qz, gripper]
Note: in absolute mode current_eef_* and the returned target are in WORLD frame.
The caller is responsible for converting to root frame before sending to IK.
"""
# XR buttons check (e.g. A or B for reset)
try:
reset_pressed = self.xr_client.get_button("B") or self.xr_client.get_button("Y")
if reset_pressed and not self.reset_button_latched:
@@ -203,141 +195,120 @@ class XrTeleopController:
raw_pose = self.xr_client.get_pose(self.controller_name)
grip = self.xr_client.get_key_value(self.grip_name)
trigger = self.xr_client.get_key_value(self.trigger_name)
except Exception as e:
except Exception:
return self.get_zero_action(0.0, current_eef_pos, current_eef_quat)
# Skip transformation if quaternion is invalid
if not is_valid_quaternion(raw_pose[3:]):
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Wait for grip release after reset
if self.require_grip_reengage:
if grip <= self.grip_release_threshold:
self.require_grip_reengage = False
else:
if self.is_absolute and current_eef_pos is not None and current_eef_quat is not None:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Use hysteresis so noisy analog grip values do not accidentally re-enable teleop.
if self.grip_active:
grip_pressed = grip > self.grip_release_threshold
else:
grip_pressed = grip >= self.grip_engage_threshold
# 握持键作为离合器 (Clutch)
# Grip button as clutch with hysteresis
grip_pressed = grip > self.grip_release_threshold if self.grip_active else grip >= self.grip_engage_threshold
if not grip_pressed:
self.prev_xr_pos = None
self.prev_xr_quat = None
# Only snapshot target on the transition frame (grip just released) or first ever frame.
# After that, keep it frozen so IK holds a fixed world-frame position.
if self.grip_active or self.target_eef_pos is None:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = False
# Ensure target tracks the current pose while we are not grabbing
if self.is_absolute and current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy() # wxyz
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
if not self.grip_active:
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
if self.is_absolute and current_eef_pos is not None and current_eef_quat is not None:
if current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy() # wxyz
self.target_eef_quat = current_eef_quat.copy()
self.grip_active = True
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Since OpenXR headset zero is not guaranteed to match robot zero, we compute the
# raw transformation in World Frame, but apply it relatively to the stored target.
# 1. First, find current XR pose projected into World frame
# Project current and previous XR poses into Isaac Sim world frame
xr_world_pos, xr_world_quat_xyzw = transform_xr_pose(raw_pose[:3], raw_pose[3:])
prev_xr_world_pos, prev_xr_world_quat_xyzw = transform_xr_pose(self.prev_xr_pos, self.prev_xr_quat)
# 2. Extract Delta POS in World frame
# Delta position (world frame), clamped per step
world_delta_pos = (xr_world_pos - prev_xr_world_pos) * self.pos_sensitivity
pos_norm = np.linalg.norm(world_delta_pos)
if pos_norm > self.max_pos_per_step:
world_delta_pos *= self.max_pos_per_step / pos_norm
# 3. Extract Delta ROT in World frame
# Delta rotation (world frame), clamped per step
world_delta_rot = quat_diff_as_rotvec_xyzw(prev_xr_world_quat_xyzw, xr_world_quat_xyzw) * self.rot_sensitivity
rot_norm = np.linalg.norm(world_delta_rot)
if rot_norm > self.max_rot_per_step:
world_delta_rot *= self.max_rot_per_step / rot_norm
# 4. Gripper
gripper_action = 1.0 if trigger > 0.5 else -1.0
if self.is_absolute:
if self.target_eef_pos is None:
self.target_eef_pos = np.zeros(3)
self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0])
# Accumulate absolute target
if self.target_eef_pos is None:
self.target_eef_pos = np.zeros(3)
self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0])
# Accumulate in world frame so VR direction always matches sim direction.
self.target_eef_pos += world_delta_pos
self.target_eef_pos += world_delta_pos
target_r = _quat_wxyz_to_rotation(self.target_eef_quat)
delta_r = R.from_rotvec(world_delta_rot)
self.target_eef_quat = _rotation_to_quat_wxyz(delta_r * target_r)
action = torch.tensor([
self.target_eef_pos[0], self.target_eef_pos[1], self.target_eef_pos[2],
self.target_eef_quat[0], self.target_eef_quat[1], self.target_eef_quat[2], self.target_eef_quat[3],
gripper_action], dtype=torch.float32)
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
else:
max_pos_delta = 0.05
world_pos_norm = np.linalg.norm(world_delta_pos)
if world_pos_norm > max_pos_delta:
world_delta_pos = world_delta_pos * (max_pos_delta / world_pos_norm)
max_rot_delta = 0.15
world_rot_norm = np.linalg.norm(world_delta_rot)
if world_rot_norm > max_rot_delta:
world_delta_rot = world_delta_rot * (max_rot_delta / world_rot_norm)
action = torch.tensor([
world_delta_pos[0], world_delta_pos[1], world_delta_pos[2],
world_delta_rot[0], world_delta_rot[1], world_delta_rot[2],
gripper_action], dtype=torch.float32)
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
# Apply rotation delta in world frame so controller direction = EEF world direction
target_r = _quat_wxyz_to_rotation(self.target_eef_quat)
self.target_eef_quat = _rotation_to_quat_wxyz(R.from_rotvec(world_delta_rot) * target_r)
self.prev_xr_pos = raw_pose[:3].copy()
self.prev_xr_quat = raw_pose[3:].copy()
self.frame_count += 1
action = torch.tensor([
*self.target_eef_pos,
*self.target_eef_quat,
gripper_action,
], dtype=torch.float32)
if self.frame_count % 30 == 0:
np.set_printoptions(precision=4, suppress=True, floatmode='fixed')
print("\n====================== [VR TELEOP DEBUG] ======================")
print(f"| Task Mode: {'ABSOLUTE' if self.is_absolute else 'RELATIVE'}")
print(f"| Raw VR Pos (OpenXR): {np.array(raw_pose[:3])}")
print(f"| Raw VR Quat (xyzw): {np.array(raw_pose[3:])}")
print(f"| XR Delta Pos (world): {world_delta_pos} (norm={np.linalg.norm(world_delta_pos):.4f})")
print(f"| XR Delta Rot (world): {world_delta_rot} (norm={np.linalg.norm(world_delta_rot):.4f})")
if self.is_absolute:
print(f"| Targ Pos (world): {action[:3].numpy()}")
print(f"| Targ Quat (world, wxyz): {action[3:7].numpy()}")
else:
print(f"| Sent Action Pos (dx,dy,dz): {action[:3].numpy()}")
print(f"| Sent Action Rot (rx,ry,rz): {action[3:6].numpy()}")
print(f"| Targ Pos (world): {action[:3].numpy()}")
print(f"| Targ Quat (world, wxyz): {action[3:7].numpy()}")
print(f"| Gripper & Trigger: Grip={grip:.2f}, Trig={trigger:.2f}")
print("===============================================================")
return action
# =====================================================================
# Main Execution Loop
# =====================================================================
def main() -> None:
"""Run teleoperation with PICO XR Controller against Isaac Lab environment."""
# 1. Configuration parsing
env_cfg = parse_env_cfg(args_cli.task, num_envs=args_cli.num_envs)
env_cfg.env_name = args_cli.task
if not isinstance(env_cfg, ManagerBasedRLEnvCfg):
raise ValueError(f"Teleoperation requires ManagerBasedRLEnvCfg. Got: {type(env_cfg)}")
if "Abs" not in args_cli.task:
raise ValueError(
f"Task '{args_cli.task}' is not an absolute IK task. "
"Only Abs tasks are supported (e.g. Isaac-MindRobot-LeftArm-IK-Abs-v0)."
)
env_cfg.terminations.time_out = None
# 2. Environment creation
try:
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
except Exception as e:
@@ -345,19 +316,41 @@ def main() -> None:
simulation_app.close()
return
# 3. Teleoperation Interface Initialization
is_abs_mode = "Abs" in args_cli.task
print(f"\n[INFO] Connecting to PICO XR Headset using {args_cli.arm} controller...")
print(f"[INFO] Using IK Mode: {'ABSOLUTE' if is_abs_mode else 'RELATIVE'}")
teleop_interface = XrTeleopController(
arm=args_cli.arm,
pos_sensitivity=args_cli.sensitivity,
rot_sensitivity=args_cli.sensitivity * (1.0 if is_abs_mode else 0.3), # Absolute rotation handles 1:1 better than relative
is_absolute=is_abs_mode
)
# Detect single-arm vs dual-arm based on action manager term names
is_dual_arm = "left_arm_action" in env.action_manager._terms
# Sensitivity: explicit args override --sensitivity, which overrides defaults
pos_sens = 1.0
rot_sens = 0.3
if args_cli.sensitivity is not None:
pos_sens = args_cli.sensitivity
rot_sens = args_cli.sensitivity
if args_cli.pos_sensitivity is not None:
pos_sens = args_cli.pos_sensitivity
if args_cli.rot_sensitivity is not None:
rot_sens = args_cli.rot_sensitivity
if is_dual_arm:
print(f"\n[INFO] Dual-arm mode detected. Using both controllers.")
print(f"[INFO] IK Mode: ABSOLUTE")
print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}")
shared_client = XrClient()
teleop_left = XrTeleopController(arm="left", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client)
teleop_right = XrTeleopController(arm="right", pos_sensitivity=pos_sens, rot_sensitivity=rot_sens, xr_client=shared_client)
teleop_interface = teleop_left # primary interface for callbacks/reset
teleop_right_ref = teleop_right
else:
print(f"\n[INFO] Connecting to PICO XR Headset using {args_cli.arm} controller...")
print(f"[INFO] IK Mode: ABSOLUTE")
print(f"[INFO] Sensitivity: pos={pos_sens:.3f} rot={rot_sens:.3f}")
teleop_interface = XrTeleopController(
arm=args_cli.arm,
pos_sensitivity=pos_sens,
rot_sensitivity=rot_sens,
)
should_reset = False
def request_reset():
nonlocal should_reset
should_reset = True
@@ -365,30 +358,34 @@ def main() -> None:
teleop_interface.add_callback("RESET", request_reset)
def get_arm_action_term():
return env.action_manager._terms["arm_action"]
def clear_ik_target_state():
"""Clear the internal IK target so reset does not reuse the previous pose command."""
if not is_abs_mode:
return
arm_action_term = get_arm_action_term()
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
"""Sync IK controller's internal target to current EEF pose to avoid jumps after reset."""
if is_dual_arm:
for term_key in ["left_arm_action", "right_arm_action"]:
arm_action_term = env.action_manager._terms[term_key]
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
else:
arm_action_term = env.action_manager._terms["arm_action"]
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
def convert_action_world_to_root(action_tensor: torch.Tensor) -> torch.Tensor:
"""Convert an absolute IK action from world frame to robot root frame."""
"""Convert absolute IK action from world frame to robot root frame."""
robot = env.unwrapped.scene["robot"]
root_pos_w = robot.data.root_pos_w[0].detach().cpu().numpy()
root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy() # wxyz
target_pos_w = action_tensor[:3].numpy()
target_quat_w = action_tensor[3:7].numpy()
root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy()
pos_root, quat_root = world_pose_to_root_frame(
target_pos_w, target_quat_w, root_pos_w, root_quat_w,
action_tensor[:3].numpy(), action_tensor[3:7].numpy(),
root_pos_w, root_quat_w,
)
out = action_tensor.clone()
out[:3] = torch.tensor(pos_root, dtype=torch.float32)
@@ -396,53 +393,43 @@ def main() -> None:
return out
def get_wheel_action() -> torch.Tensor:
"""Read left joystick and return 4-DOF wheel velocity command.
Skid-steer differential drive.
Joystick: Y-axis (+1 = forward), X-axis (+1 = right turn).
Joint order from articulation (terminal log):
[right_b, left_b, left_f, right_f]
Right/left sign convention assumes both sides' joints have the same
axis direction (positive velocity = forward). If the robot drives
backward when pushing forward, negate base_speed in the launch command.
"""
"""Read left joystick 4-DOF wheel velocity [right_b, left_b, left_f, right_f]."""
try:
joy = teleop_interface.xr_client.get_joystick("left")
jy = float(joy[1]) # forward / backward
jx = float(joy[0]) # right / left
jy = float(joy[1])
jx = float(joy[0])
except Exception:
return torch.zeros(4)
v = jy * args_cli.base_speed
omega = jx * args_cli.base_turn
# Positive omega = turn right → left wheels faster, right wheels slower
right_vel = v - omega
left_vel = v + omega
return torch.tensor([right_vel, left_vel, left_vel, right_vel], dtype=torch.float32)
return torch.tensor(
[right_vel, left_vel, left_vel, right_vel], dtype=torch.float32
)
env.reset()
obs, _ = env.reset()
clear_ik_target_state()
teleop_interface.reset()
if is_dual_arm:
teleop_right_ref.reset()
print("\n" + "=" * 50)
print(" 🚀 Teleoperation Started!")
print(" 🎮 Use the TRIGGER to open/close gripper.")
print(" ✊ Hold GRIP and move the controller to move the arm.")
print(" 🕹️ Left joystick: Y=forward/back, X=turn left/right.")
print(" 🔄 Press B or Y to reset the environment.")
print(" Teleoperation Started!")
if is_dual_arm:
print(" LEFT controller → left arm")
print(" RIGHT controller → right arm")
print(" TRIGGER: open/close gripper")
print(" GRIP (hold): move the arm")
print(" Left joystick: Y=forward/back, X=turn")
print(" B / Y: reset environment")
print("=" * 50 + "\n")
device = env.unwrapped.device
sim_frame = 0
obs, _ = env.reset()
clear_ik_target_state()
last_root_left = None
last_root_right = None
while simulation_app.is_running():
try:
with torch.inference_mode():
@@ -450,66 +437,78 @@ def main() -> None:
obs, _ = env.reset()
clear_ik_target_state()
teleop_interface.reset()
if is_dual_arm:
teleop_right_ref.reset()
should_reset = False
sim_frame = 0
last_root_left = None
last_root_right = None
continue
# Read current EEF in world frame from observations.
policy_obs = obs["policy"]
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
# Get action from XR Controller (world frame for absolute mode).
action_np = teleop_interface.advance(
current_eef_pos=eef_pos, current_eef_quat=eef_quat,
)
# IK expects root-frame commands; convert just before sending.
if is_abs_mode:
action_np = convert_action_world_to_root(action_np)
# Action manager order: arm_action | wheel_action | gripper_action
# arm=7, wheel=4, gripper=1 → total 12 dims.
wheel_np = get_wheel_action()
action_np = torch.cat([action_np[:7], wheel_np, action_np[7:]])
if is_dual_arm:
eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy()
eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy()
eef_quat_right = policy_obs["eef_quat_right"][0].cpu().numpy()
left_action = teleop_left.advance(current_eef_pos=eef_pos_left, current_eef_quat=eef_quat_left)
right_action = teleop_right_ref.advance(current_eef_pos=eef_pos_right, current_eef_quat=eef_quat_right)
# Only recompute root-frame IK target when grip is active; otherwise freeze joints
if teleop_left.grip_active or last_root_left is None:
last_root_left = convert_action_world_to_root(left_action)[:7].clone()
if teleop_right_ref.grip_active or last_root_right is None:
last_root_right = convert_action_world_to_root(right_action)[:7].clone()
# Action layout: left_arm(7) | wheel(4) | left_gripper(1) | right_arm(7) | right_gripper(1)
action_np = torch.cat([
last_root_left, wheel_np, left_action[7:8],
last_root_right, right_action[7:8],
])
else:
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
action_np = teleop_interface.advance(current_eef_pos=eef_pos, current_eef_quat=eef_quat)
if teleop_interface.grip_active or last_root_left is None:
last_root_left = convert_action_world_to_root(action_np)[:7].clone()
# Action layout: arm(7) | wheel(4) | gripper(1)
action_np = torch.cat([last_root_left, wheel_np, action_np[7:8]])
actions = action_np.unsqueeze(0).repeat(env.num_envs, 1).to(device)
# Step environment
obs, _, _, _, _ = env.step(actions)
# Print robot state every 30 frames
sim_frame += 1
if sim_frame % 30 == 0:
np.set_printoptions(precision=4, suppress=True, floatmode='fixed')
policy_obs = obs["policy"]
joint_pos = policy_obs["joint_pos"][0].cpu().numpy()
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
last_act = policy_obs["actions"][0].cpu().numpy()
# On first print, dump ALL joint names + positions to identify indices
if sim_frame == 30:
robot = env.unwrapped.scene["robot"]
jnames = robot.joint_names
print(f"\n{'='*70}")
print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS (relative)")
print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS")
print(f"{'='*70}")
for i, name in enumerate(jnames):
print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}")
print(f"{'='*70}")
# Find arm joint indices dynamically by looking at the first 6-7 joints that aren't fingers or hands
arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
print(f" Deduced left arm indices: {arm_idx}")
print(f"{'='*70}\n")
# Get arm indices (cache-friendly: find once)
if not hasattr(env, '_arm_idx_cache'):
robot = env.unwrapped.scene["robot"]
jnames = robot.joint_names
env._arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
arm_idx = env._arm_idx_cache
arm_joints = joint_pos[arm_idx]
env._right_arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("r_joint")]
arm_joints = joint_pos[env._arm_idx_cache]
right_arm_joints = joint_pos[env._right_arm_idx_cache]
try:
joy_dbg = teleop_interface.xr_client.get_joystick("left")
@@ -517,21 +516,59 @@ def main() -> None:
except Exception:
joy_str = "N/A"
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| EEF Pos (world): {eef_pos}")
print(f"| EEF Quat (world, wxyz): {eef_quat}")
print(f"| Last Action Sent: {last_act}")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
if is_dual_arm:
eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy()
eef_quat_left = policy_obs["eef_quat_left"][0].cpu().numpy()
eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy()
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| Right Arm Joints (rad): {right_arm_joints}")
print(f"| Left EEF Pos (world, m): {eef_pos_left}")
print(f"| Right EEF Pos (world, m): {eef_pos_right}")
print(f"| Cmd left_arm(abs): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd left_gripper: {last_act[11:12]} (+1=open -1=close)")
print(f"| Cmd right_arm(abs): {last_act[12:19]}")
print(f"| Cmd right_gripper: {last_act[19:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
else:
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
if not hasattr(env, '_prev_eef_quat'):
env._prev_eef_quat = eef_quat.copy()
prev_q = env._prev_eef_quat
r_prev = R.from_quat([prev_q[1], prev_q[2], prev_q[3], prev_q[0]])
r_curr = R.from_quat([eef_quat[1], eef_quat[2], eef_quat[3], eef_quat[0]])
delta_rotvec = (r_curr * r_prev.inv()).as_rotvec()
delta_angle_deg = np.degrees(np.linalg.norm(delta_rotvec))
delta_axis = delta_rotvec / (np.linalg.norm(delta_rotvec) + 1e-9)
env._prev_eef_quat = eef_quat.copy()
approach_vec = r_curr.apply(np.array([1.0, 0.0, 0.0]))
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| EEF Pos (world, m): {eef_pos}")
print(f"| EEF approach vec (world): [{approach_vec[0]:+.3f}, {approach_vec[1]:+.3f}, {approach_vec[2]:+.3f}] (夹爪朝向)")
print(f"| EEF rot since last print: {delta_angle_deg:+.2f}° around [{delta_axis[0]:+.3f},{delta_axis[1]:+.3f},{delta_axis[2]:+.3f}] (world)")
print(f"| Cmd arm(abs pos+quat): {last_act[:7]}")
print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}")
print(f"| Cmd gripper: {last_act[11:]} (+1=open -1=close)")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break
teleop_interface.close()
if is_dual_arm:
teleop_right_ref.close()
shared_client.close()
env.close()
if __name__ == "__main__":
main()
simulation_app.close()