Files
yt lee bfe28b188a Fix XR teleop: body-frame IK control for mobile chassis
Switch arm IK from world-frame to body-frame control so that
pushing the XR controller forward always moves the arm in the
robot's forward direction, regardless of chassis rotation.

Key changes:
- dual_arm_agent: convert EEF observations to body frame before
  passing to XR controller; send body-frame IK targets directly
  (removed convert_action_world_to_root)
- xr_controller: XR deltas treated as body-frame deltas (no yaw
  rotation needed — VR view tracks robot heading naturally)
- streaming: add debug frame save for stereo alignment diagnostics
- mindrobot_2i_cfg: IdealPDActuator for trunk, disabled gravity
- Author headers updated to Yutang Li, SIAT

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 11:32:28 +08:00

137 lines
4.8 KiB
Python

# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Base teleoperation agent with main loop, env management, and reset handling."""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
import torch
logger = logging.getLogger(__name__)
class BaseTeleopAgent(ABC):
"""Abstract base for XR teleoperation agents.
Provides the main simulation loop, env lifecycle, and reset handling.
Subclasses implement assemble_action() to define their control dimensions.
"""
def __init__(self, env, simulation_app, *, debug_viewports: bool = True):
self.env = env
self.simulation_app = simulation_app
self.device = env.unwrapped.device
self.num_envs = env.num_envs
self._should_reset = False
self.sim_frame = 0
# Viewport management
self._robot_viewports: dict[str, object] = {}
self._robot_cam_prims: dict[str, str] = {
"Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand",
"Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand",
"Head": "/World/envs/env_0/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft",
"Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest",
}
if debug_viewports:
self._create_viewports()
def request_reset(self):
self._should_reset = True
print("[INFO] Reset requested via XR button.")
def _create_viewports(self):
try:
import omni.kit.viewport.utility as vp_util
except ImportError:
logger.warning("[Viewport] omni.kit.viewport.utility not available.")
return
for name, cam_path in self._robot_cam_prims.items():
vp_win = vp_util.create_viewport_window(f"Robot {name} View", width=640, height=360)
vp_win.viewport_api.camera_path = cam_path
self._robot_viewports[name] = vp_win
print(f"[INFO] Viewport 'Robot {name} View' bound to: {cam_path}")
def _rebind_viewports(self):
for name, vp_win in self._robot_viewports.items():
vp_win.viewport_api.camera_path = self._robot_cam_prims[name]
def _clear_ik_target_state(self):
"""Sync IK controller internals to current EEF pose to prevent jumps after reset."""
action_terms = self.env.action_manager._terms
for key in self._ik_action_term_names():
if key in action_terms:
term = action_terms[key]
ee_pos_b, ee_quat_b = term._compute_frame_pose()
term._raw_actions.zero_()
term._processed_actions.zero_()
term._ik_controller._command.zero_()
term._ik_controller.ee_pos_des[:] = ee_pos_b
term._ik_controller.ee_quat_des[:] = ee_quat_b
def _ik_action_term_names(self) -> list[str]:
"""Return IK action term names to clear on reset. Override in subclass."""
return ["arm_action"]
def _do_reset(self, obs):
"""Execute reset sequence. Returns new obs."""
obs, _ = self.env.reset()
self._clear_ik_target_state()
self._rebind_viewports()
self._should_reset = False
self.sim_frame = 0
self.on_reset()
return obs
def on_reset(self):
"""Hook for subclasses to reset their own state."""
pass
@abstractmethod
def assemble_action(self, obs) -> torch.Tensor:
"""Build the full action tensor for env.step(). Must be implemented by subclass."""
...
def post_step(self, obs):
"""Hook called after env.step(). Subclasses can add streaming, diagnostics, etc."""
pass
def run(self):
"""Main simulation loop."""
obs, _ = self.env.reset()
self._clear_ik_target_state()
self.on_reset()
self._print_banner()
while self.simulation_app.is_running():
try:
with torch.inference_mode():
if self._should_reset:
obs = self._do_reset(obs)
continue
action = self.assemble_action(obs)
actions = action.unsqueeze(0).repeat(self.num_envs, 1).to(self.device)
obs, _, _, _, _ = self.env.step(actions)
self.sim_frame += 1
self.post_step(obs)
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break
self.cleanup()
def _print_banner(self):
print("\n" + "=" * 50)
print(" Teleoperation Started!")
print("=" * 50 + "\n")
def cleanup(self):
"""Release resources. Override in subclass to close controllers, streamers, etc."""
self.env.close()