Switch arm IK from world-frame to body-frame control so that pushing the XR controller forward always moves the arm in the robot's forward direction, regardless of chassis rotation. Key changes: - dual_arm_agent: convert EEF observations to body frame before passing to XR controller; send body-frame IK targets directly (removed convert_action_world_to_root) - xr_controller: XR deltas treated as body-frame deltas (no yaw rotation needed — VR view tracks robot heading naturally) - streaming: add debug frame save for stereo alignment diagnostics - mindrobot_2i_cfg: IdealPDActuator for trunk, disabled gravity - Author headers updated to Yutang Li, SIAT Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
140 lines
5.7 KiB
Python
140 lines
5.7 KiB
Python
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
"""Dual-arm XR teleoperation agent with chassis control."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import torch
|
|
|
|
from xr_utils import XrClient
|
|
|
|
from .base_agent import BaseTeleopAgent
|
|
from .xr_controller import XrTeleopController
|
|
from .chassis import ChassisController
|
|
from .frame_utils import convert_action_world_to_root
|
|
|
|
|
|
class DualArmXrAgent(BaseTeleopAgent):
|
|
"""Dual-arm teleoperation with chassis joystick control.
|
|
|
|
Action: left_arm(7) | wheel(4) | left_grip(1) | right_arm(7) | right_grip(1) = 20D
|
|
"""
|
|
|
|
def __init__(self, env, simulation_app, *,
|
|
pos_sensitivity: float = 1.0,
|
|
rot_sensitivity: float = 0.3,
|
|
base_speed: float = 5.0,
|
|
base_turn: float = 2.0,
|
|
drive_speed: float = 0.5,
|
|
drive_turn: float = 1.5,
|
|
debug_viewports: bool = True):
|
|
super().__init__(env, simulation_app, debug_viewports=debug_viewports)
|
|
|
|
self.shared_client = XrClient()
|
|
self.teleop_left = XrTeleopController(
|
|
arm="left", pos_sensitivity=pos_sensitivity,
|
|
rot_sensitivity=rot_sensitivity, xr_client=self.shared_client,
|
|
)
|
|
self.teleop_right = XrTeleopController(
|
|
arm="right", pos_sensitivity=pos_sensitivity,
|
|
rot_sensitivity=rot_sensitivity, xr_client=self.shared_client,
|
|
)
|
|
self.teleop_left.add_callback("RESET", self.request_reset)
|
|
|
|
self.chassis = ChassisController(
|
|
base_speed=base_speed, base_turn=base_turn,
|
|
drive_speed=drive_speed, drive_turn=drive_turn,
|
|
)
|
|
|
|
self._last_root_left = None
|
|
self._last_root_right = None
|
|
|
|
@property
|
|
def xr_client(self):
|
|
return self.shared_client
|
|
|
|
def _ik_action_term_names(self) -> list[str]:
|
|
return ["left_arm_action", "right_arm_action"]
|
|
|
|
def on_reset(self):
|
|
self.teleop_left.reset()
|
|
self.teleop_right.reset()
|
|
self._last_root_left = None
|
|
self._last_root_right = None
|
|
|
|
def assemble_action(self, obs) -> torch.Tensor:
|
|
policy_obs = obs["policy"]
|
|
robot = self.env.unwrapped.scene["robot"]
|
|
|
|
# Get robot root pose for world→body EEF conversion
|
|
root_pos_w = robot.data.root_pos_w[0].cpu().numpy()
|
|
root_quat_w = robot.data.root_quat_w[0].cpu().numpy()
|
|
|
|
# Read chassis
|
|
wheel_cmd, self._v_fwd, self._omega = self.chassis.get_commands(self.shared_client)
|
|
|
|
# Convert EEF observations from world frame to body frame
|
|
from .frame_utils import world_pose_to_root_frame
|
|
eef_pos_left_w = policy_obs["eef_pos_left"][0].cpu().numpy()
|
|
eef_quat_left_w = policy_obs["eef_quat_left"][0].cpu().numpy()
|
|
eef_pos_left_b, eef_quat_left_b = world_pose_to_root_frame(
|
|
eef_pos_left_w, eef_quat_left_w, root_pos_w, root_quat_w)
|
|
|
|
eef_pos_right_w = policy_obs["eef_pos_right"][0].cpu().numpy()
|
|
eef_quat_right_w = policy_obs["eef_quat_right"][0].cpu().numpy()
|
|
eef_pos_right_b, eef_quat_right_b = world_pose_to_root_frame(
|
|
eef_pos_right_w, eef_quat_right_w, root_pos_w, root_quat_w)
|
|
|
|
# XR controller works in body frame:
|
|
# - current_eef is body frame (converted above)
|
|
# - XR delta is treated as body-frame delta directly (no rotation)
|
|
# because in VR teleop the user's visual frame tracks the robot heading
|
|
# - output is body frame → send directly to IK
|
|
left_action = self.teleop_left.advance(current_eef_pos=eef_pos_left_b, current_eef_quat=eef_quat_left_b)
|
|
right_action = self.teleop_right.advance(current_eef_pos=eef_pos_right_b, current_eef_quat=eef_quat_right_b)
|
|
|
|
# Joint-locking: target is already in body frame, use directly
|
|
if self.teleop_left.grip_active or self._last_root_left is None:
|
|
self._last_root_left = left_action[:7].clone()
|
|
if self.teleop_right.grip_active or self._last_root_right is None:
|
|
self._last_root_right = right_action[:7].clone()
|
|
|
|
# Diagnostic: print once per second when left grip active
|
|
_cnt = getattr(self, '_diag_cnt', 0) + 1
|
|
self._diag_cnt = _cnt
|
|
if self.teleop_left.grip_active and _cnt % 30 == 0:
|
|
import math
|
|
print(f"[FRAME DIAG] root_quat_w={root_quat_w}")
|
|
print(f" eef_left_world=({eef_pos_left_w[0]:.3f},{eef_pos_left_w[1]:.3f},{eef_pos_left_w[2]:.3f})")
|
|
print(f" eef_left_body =({eef_pos_left_b[0]:.3f},{eef_pos_left_b[1]:.3f},{eef_pos_left_b[2]:.3f})")
|
|
print(f" ik_cmd_body =({self._last_root_left[0]:.3f},{self._last_root_left[1]:.3f},{self._last_root_left[2]:.3f})")
|
|
|
|
# left_arm(7) | wheel(4) | left_grip(1) | right_arm(7) | right_grip(1)
|
|
return torch.cat([
|
|
self._last_root_left, wheel_cmd, left_action[7:8],
|
|
self._last_root_right, right_action[7:8],
|
|
])
|
|
|
|
def post_step(self, obs):
|
|
# Apply direct base velocity override for skid-steer
|
|
robot = self.env.unwrapped.scene["robot"]
|
|
self.chassis.apply_base_velocity(robot, self._v_fwd, self._omega, self.num_envs, self.device)
|
|
|
|
def cleanup(self):
|
|
self.teleop_left.close()
|
|
self.teleop_right.close()
|
|
self.shared_client.close()
|
|
super().cleanup()
|
|
|
|
def _print_banner(self):
|
|
print("\n" + "=" * 50)
|
|
print(" Teleoperation Started!")
|
|
print(" LEFT controller -> left arm")
|
|
print(" RIGHT controller -> right arm")
|
|
print(" TRIGGER: open/close gripper")
|
|
print(" GRIP (hold): move the arm")
|
|
print(" Left joystick: Y=forward/back, X=turn")
|
|
print(" Y (left controller): reset environment")
|
|
print("=" * 50 + "\n")
|