This commit is contained in:
2026-03-06 17:31:00 +08:00
parent 36b4273582
commit 2b290d805a
3 changed files with 667 additions and 492 deletions

View File

@@ -45,6 +45,14 @@ parser.add_argument(
choices=["left", "right"],
help="Which arm/controller to use.",
)
parser.add_argument(
"--base_speed", type=float, default=3.0,
help="Max wheel speed (rad/s) for joystick full forward.",
)
parser.add_argument(
"--base_turn", type=float, default=2.0,
help="Max wheel differential (rad/s) for joystick full left/right.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
@@ -78,6 +86,31 @@ from xr_utils.geometry import R_HEADSET_TO_WORLD
# Teleoperation Interface for XR
# =====================================================================
def _quat_wxyz_to_rotation(quat_wxyz: np.ndarray) -> R:
"""Convert Isaac-style wxyz quaternion to scipy Rotation."""
return R.from_quat([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]])
def _rotation_to_quat_wxyz(rot: R) -> np.ndarray:
"""Convert scipy Rotation quaternion to Isaac-style wxyz."""
quat_xyzw = rot.as_quat()
return np.array([quat_xyzw[3], quat_xyzw[0], quat_xyzw[1], quat_xyzw[2]])
def world_pose_to_root_frame(
pos_w: np.ndarray,
quat_wxyz: np.ndarray,
root_pos_w: np.ndarray,
root_quat_wxyz: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Express a world-frame pose in the robot root frame."""
root_rot = _quat_wxyz_to_rotation(root_quat_wxyz)
pose_rot = _quat_wxyz_to_rotation(quat_wxyz)
pos_root = root_rot.inv().apply(pos_w - root_pos_w)
quat_root = _rotation_to_quat_wxyz(root_rot.inv() * pose_rot)
return pos_root, quat_root
class XrTeleopController:
"""Teleop controller for PICO XR headset."""
@@ -105,6 +138,10 @@ class XrTeleopController:
self.grip_active = False
self.frame_count = 0
self.reset_button_latched = False
self.require_grip_reengage = False
self.grip_engage_threshold = 0.8
self.grip_release_threshold = 0.2
# Callbacks (like reset, etc)
self.callbacks = {}
@@ -117,6 +154,11 @@ class XrTeleopController:
self.prev_xr_quat = None
self.grip_active = False
self.frame_count = 0
self.target_eef_pos = None
self.target_eef_quat = None
# Require one grip release after reset so stale controller motion
# cannot immediately drive the robot back toward the previous pose.
self.require_grip_reengage = True
def close(self):
self.xr_client.close()
@@ -144,14 +186,17 @@ class XrTeleopController:
Reads the XR controller.
Relative bounds return 7D action tensor: [dx, dy, dz, drx, dry, drz, gripper]
Absolute bounds return 8D action tensor: [x, y, z, qw, qx, qy, qz, gripper]
Note: current_eef_quat expects wxyz.
Note: in absolute mode current_eef_* and the returned target are in WORLD frame.
The caller is responsible for converting to root frame before sending to IK.
"""
# XR buttons check (e.g. A or B for reset)
try:
if self.xr_client.get_button("B") or self.xr_client.get_button("Y"):
reset_pressed = self.xr_client.get_button("B") or self.xr_client.get_button("Y")
if reset_pressed and not self.reset_button_latched:
if "RESET" in self.callbacks:
self.callbacks["RESET"]()
except:
self.reset_button_latched = reset_pressed
except Exception:
pass
try:
@@ -165,11 +210,23 @@ class XrTeleopController:
if not is_valid_quaternion(raw_pose[3:]):
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Transform XR pose pos directly via Matrix mapping for safety check (though we can map diffs next)
pos_w = self.R_headset_world @ raw_pose[:3]
if self.require_grip_reengage:
if grip <= self.grip_release_threshold:
self.require_grip_reengage = False
else:
if self.is_absolute and current_eef_pos is not None and current_eef_quat is not None:
self.target_eef_pos = current_eef_pos.copy()
self.target_eef_quat = current_eef_quat.copy()
return self.get_zero_action(trigger, current_eef_pos, current_eef_quat)
# Use hysteresis so noisy analog grip values do not accidentally re-enable teleop.
if self.grip_active:
grip_pressed = grip > self.grip_release_threshold
else:
grip_pressed = grip >= self.grip_engage_threshold
# 握持键作为离合器 (Clutch)
if grip < 0.5:
if not grip_pressed:
self.prev_xr_pos = None
self.prev_xr_quat = None
self.grip_active = False
@@ -197,11 +254,9 @@ class XrTeleopController:
# 2. Extract Delta POS in World frame
world_delta_pos = (xr_world_pos - prev_xr_world_pos) * self.pos_sensitivity
pos_norm = np.linalg.norm((raw_pose[:3] - self.prev_xr_pos))
# 3. Extract Delta ROT in World frame
world_delta_rot = quat_diff_as_rotvec_xyzw(prev_xr_world_quat_xyzw, xr_world_quat_xyzw) * self.rot_sensitivity
rot_norm = np.linalg.norm(quat_diff_as_rotvec_xyzw(self.prev_xr_quat, raw_pose[3:]))
# 4. Gripper
gripper_action = 1.0 if trigger > 0.5 else -1.0
@@ -211,16 +266,12 @@ class XrTeleopController:
self.target_eef_pos = np.zeros(3)
self.target_eef_quat = np.array([1.0, 0.0, 0.0, 0.0])
# Position update (simple translation in world frame)
# Accumulate in world frame so VR direction always matches sim direction.
self.target_eef_pos += world_delta_pos
# Rotation update: apply delta_R (in world frame) to target_R (in world frame)
# R_new = R_delta @ R_target
target_r = R.from_quat([self.target_eef_quat[1], self.target_eef_quat[2], self.target_eef_quat[3], self.target_eef_quat[0]])
target_r = _quat_wxyz_to_rotation(self.target_eef_quat)
delta_r = R.from_rotvec(world_delta_rot)
new_r = delta_r * target_r
new_rot_xyzw = new_r.as_quat()
self.target_eef_quat = np.array([new_rot_xyzw[3], new_rot_xyzw[0], new_rot_xyzw[1], new_rot_xyzw[2]])
self.target_eef_quat = _rotation_to_quat_wxyz(delta_r * target_r)
action = torch.tensor([
self.target_eef_pos[0], self.target_eef_pos[1], self.target_eef_pos[2],
@@ -231,14 +282,15 @@ class XrTeleopController:
self.prev_xr_quat = raw_pose[3:].copy()
else:
# Relative clamping limits (used by relative mode to avoid divergence)
max_pos_delta = 0.04
if pos_norm > max_pos_delta:
world_delta_pos = (world_delta_pos * max_pos_delta / pos_norm)
max_pos_delta = 0.05
world_pos_norm = np.linalg.norm(world_delta_pos)
if world_pos_norm > max_pos_delta:
world_delta_pos = world_delta_pos * (max_pos_delta / world_pos_norm)
max_rot_delta = 0.02
if rot_norm > max_rot_delta:
world_delta_rot = (world_delta_rot * max_rot_delta / rot_norm)
max_rot_delta = 0.15
world_rot_norm = np.linalg.norm(world_delta_rot)
if world_rot_norm > max_rot_delta:
world_delta_rot = world_delta_rot * (max_rot_delta / world_rot_norm)
action = torch.tensor([
world_delta_pos[0], world_delta_pos[1], world_delta_pos[2],
@@ -256,11 +308,11 @@ class XrTeleopController:
print(f"| Task Mode: {'ABSOLUTE' if self.is_absolute else 'RELATIVE'}")
print(f"| Raw VR Pos (OpenXR): {np.array(raw_pose[:3])}")
print(f"| Raw VR Quat (xyzw): {np.array(raw_pose[3:])}")
print(f"| XR Delta Pos (world): {world_delta_pos} (norm={pos_norm:.4f})")
print(f"| XR Delta Rot (world): {world_delta_rot} (norm={rot_norm:.4f})")
print(f"| XR Delta Pos (world): {world_delta_pos} (norm={np.linalg.norm(world_delta_pos):.4f})")
print(f"| XR Delta Rot (world): {world_delta_rot} (norm={np.linalg.norm(world_delta_rot):.4f})")
if self.is_absolute:
print(f"| Targ Pos (dx,dy,dz): {action[:3].numpy()}")
print(f"| Targ Quat (wxyz): {action[3:7].numpy()}")
print(f"| Targ Pos (world): {action[:3].numpy()}")
print(f"| Targ Quat (world, wxyz): {action[3:7].numpy()}")
else:
print(f"| Sent Action Pos (dx,dy,dz): {action[:3].numpy()}")
print(f"| Sent Action Rot (rx,ry,rz): {action[3:6].numpy()}")
@@ -313,30 +365,114 @@ def main() -> None:
teleop_interface.add_callback("RESET", request_reset)
def get_arm_action_term():
return env.action_manager._terms["arm_action"]
def clear_ik_target_state():
"""Clear the internal IK target so reset does not reuse the previous pose command."""
if not is_abs_mode:
return
arm_action_term = get_arm_action_term()
ee_pos_b, ee_quat_b = arm_action_term._compute_frame_pose()
arm_action_term._raw_actions.zero_()
arm_action_term._processed_actions.zero_()
arm_action_term._ik_controller._command.zero_()
arm_action_term._ik_controller.ee_pos_des[:] = ee_pos_b
arm_action_term._ik_controller.ee_quat_des[:] = ee_quat_b
def convert_action_world_to_root(action_tensor: torch.Tensor) -> torch.Tensor:
"""Convert an absolute IK action from world frame to robot root frame."""
robot = env.unwrapped.scene["robot"]
root_pos_w = robot.data.root_pos_w[0].detach().cpu().numpy()
root_quat_w = robot.data.root_quat_w[0].detach().cpu().numpy() # wxyz
target_pos_w = action_tensor[:3].numpy()
target_quat_w = action_tensor[3:7].numpy()
pos_root, quat_root = world_pose_to_root_frame(
target_pos_w, target_quat_w, root_pos_w, root_quat_w,
)
out = action_tensor.clone()
out[:3] = torch.tensor(pos_root, dtype=torch.float32)
out[3:7] = torch.tensor(quat_root, dtype=torch.float32)
return out
def get_wheel_action() -> torch.Tensor:
"""Read left joystick and return 4-DOF wheel velocity command.
Skid-steer differential drive.
Joystick: Y-axis (+1 = forward), X-axis (+1 = right turn).
Joint order from articulation (terminal log):
[right_b, left_b, left_f, right_f]
Right/left sign convention assumes both sides' joints have the same
axis direction (positive velocity = forward). If the robot drives
backward when pushing forward, negate base_speed in the launch command.
"""
try:
joy = teleop_interface.xr_client.get_joystick("left")
jy = float(joy[1]) # forward / backward
jx = float(joy[0]) # right / left
except Exception:
return torch.zeros(4)
v = jy * args_cli.base_speed
omega = jx * args_cli.base_turn
# Positive omega = turn right → left wheels faster, right wheels slower
right_vel = v - omega
left_vel = v + omega
return torch.tensor(
[right_vel, left_vel, left_vel, right_vel], dtype=torch.float32
)
env.reset()
clear_ik_target_state()
teleop_interface.reset()
print("\n" + "=" * 50)
print(" 🚀 Teleoperation Started!")
print(" 🎮 Use the TRIGGER to open/close gripper.")
print(" ✊ Hold GRIP button and move the controller to move the arm.")
print(" 🕹️ Press B or Y to reset the environment.")
print(" ✊ Hold GRIP and move the controller to move the arm.")
print(" 🕹️ Left joystick: Y=forward/back, X=turn left/right.")
print(" 🔄 Press B or Y to reset the environment.")
print("=" * 50 + "\n")
device = env.unwrapped.device
sim_frame = 0
obs, _ = env.reset()
clear_ik_target_state()
while simulation_app.is_running():
try:
with torch.inference_mode():
# Extract tracking pose to seed absolute IK
if should_reset:
obs, _ = env.reset()
clear_ik_target_state()
teleop_interface.reset()
should_reset = False
sim_frame = 0
continue
# Read current EEF in world frame from observations.
policy_obs = obs["policy"]
eef_pos = policy_obs["eef_pos"][0].cpu().numpy()
eef_quat = policy_obs["eef_quat"][0].cpu().numpy()
# Get action from XR Controller
action_np = teleop_interface.advance(current_eef_pos=eef_pos, current_eef_quat=eef_quat)
# Get action from XR Controller (world frame for absolute mode).
action_np = teleop_interface.advance(
current_eef_pos=eef_pos, current_eef_quat=eef_quat,
)
# IK expects root-frame commands; convert just before sending.
if is_abs_mode:
action_np = convert_action_world_to_root(action_np)
# Action manager order: arm_action | wheel_action | gripper_action
# arm=7, wheel=4, gripper=1 → total 12 dims.
wheel_np = get_wheel_action()
action_np = torch.cat([action_np[:7], wheel_np, action_np[7:]])
actions = action_np.unsqueeze(0).repeat(env.num_envs, 1).to(device)
# Step environment
@@ -363,30 +499,32 @@ def main() -> None:
print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}")
print(f"{'='*70}")
# Find arm joint indices dynamically by looking at the first 6-7 joints that aren't fingers or hands
arm_idx = [i for i, n in enumerate(jnames) if "finger" not in n and "hand" not in n][:7]
print(f" Deduced arm indices: {arm_idx}")
arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
print(f" Deduced left arm indices: {arm_idx}")
print(f"{'='*70}\n")
# Get arm indices (cache-friendly: find once)
if not hasattr(env, '_arm_idx_cache'):
robot = env.unwrapped.scene["robot"]
jnames = robot.joint_names
env._arm_idx_cache = [i for i, n in enumerate(jnames) if "finger" not in n and "hand" not in n][:7]
env._arm_idx_cache = [i for i, n in enumerate(jnames) if n.startswith("l_joint")]
arm_idx = env._arm_idx_cache
arm_joints = joint_pos[arm_idx]
try:
joy_dbg = teleop_interface.xr_client.get_joystick("left")
joy_str = f"[{joy_dbg[0]:+.2f}, {joy_dbg[1]:+.2f}]"
except Exception:
joy_str = "N/A"
print(f"\n---------------- [ROBOT STATE frame={sim_frame}] ----------------")
print(f"| Left Arm Joints (rad): {arm_joints}")
print(f"| EEF Pos (world): {eef_pos}")
print(f"| EEF Quat (world, wxyz): {eef_quat}")
print(f"| Last Action Sent: {last_act}")
print(f"| Joystick (x=turn,y=fwd): {joy_str}")
print(f"----------------------------------------------------------------")
if should_reset:
env.reset()
teleop_interface.reset()
should_reset = False
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break