Switch arm IK from world-frame to body-frame control so that pushing the XR controller forward always moves the arm in the robot's forward direction, regardless of chassis rotation. Key changes: - dual_arm_agent: convert EEF observations to body frame before passing to XR controller; send body-frame IK targets directly (removed convert_action_world_to_root) - xr_controller: XR deltas treated as body-frame deltas (no yaw rotation needed — VR view tracks robot heading naturally) - streaming: add debug frame save for stereo alignment diagnostics - mindrobot_2i_cfg: IdealPDActuator for trunk, disabled gravity - Author headers updated to Yutang Li, SIAT Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
137 lines
4.8 KiB
Python
137 lines
4.8 KiB
Python
# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
"""Base teleoperation agent with main loop, env management, and reset handling."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from abc import ABC, abstractmethod
|
|
|
|
import torch
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BaseTeleopAgent(ABC):
|
|
"""Abstract base for XR teleoperation agents.
|
|
|
|
Provides the main simulation loop, env lifecycle, and reset handling.
|
|
Subclasses implement assemble_action() to define their control dimensions.
|
|
"""
|
|
|
|
def __init__(self, env, simulation_app, *, debug_viewports: bool = True):
|
|
self.env = env
|
|
self.simulation_app = simulation_app
|
|
self.device = env.unwrapped.device
|
|
self.num_envs = env.num_envs
|
|
self._should_reset = False
|
|
self.sim_frame = 0
|
|
|
|
# Viewport management
|
|
self._robot_viewports: dict[str, object] = {}
|
|
self._robot_cam_prims: dict[str, str] = {
|
|
"Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand",
|
|
"Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand",
|
|
"Head": "/World/envs/env_0/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft",
|
|
"Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest",
|
|
}
|
|
if debug_viewports:
|
|
self._create_viewports()
|
|
|
|
def request_reset(self):
|
|
self._should_reset = True
|
|
print("[INFO] Reset requested via XR button.")
|
|
|
|
def _create_viewports(self):
|
|
try:
|
|
import omni.kit.viewport.utility as vp_util
|
|
except ImportError:
|
|
logger.warning("[Viewport] omni.kit.viewport.utility not available.")
|
|
return
|
|
for name, cam_path in self._robot_cam_prims.items():
|
|
vp_win = vp_util.create_viewport_window(f"Robot {name} View", width=640, height=360)
|
|
vp_win.viewport_api.camera_path = cam_path
|
|
self._robot_viewports[name] = vp_win
|
|
print(f"[INFO] Viewport 'Robot {name} View' bound to: {cam_path}")
|
|
|
|
def _rebind_viewports(self):
|
|
for name, vp_win in self._robot_viewports.items():
|
|
vp_win.viewport_api.camera_path = self._robot_cam_prims[name]
|
|
|
|
def _clear_ik_target_state(self):
|
|
"""Sync IK controller internals to current EEF pose to prevent jumps after reset."""
|
|
action_terms = self.env.action_manager._terms
|
|
for key in self._ik_action_term_names():
|
|
if key in action_terms:
|
|
term = action_terms[key]
|
|
ee_pos_b, ee_quat_b = term._compute_frame_pose()
|
|
term._raw_actions.zero_()
|
|
term._processed_actions.zero_()
|
|
term._ik_controller._command.zero_()
|
|
term._ik_controller.ee_pos_des[:] = ee_pos_b
|
|
term._ik_controller.ee_quat_des[:] = ee_quat_b
|
|
|
|
def _ik_action_term_names(self) -> list[str]:
|
|
"""Return IK action term names to clear on reset. Override in subclass."""
|
|
return ["arm_action"]
|
|
|
|
def _do_reset(self, obs):
|
|
"""Execute reset sequence. Returns new obs."""
|
|
obs, _ = self.env.reset()
|
|
self._clear_ik_target_state()
|
|
self._rebind_viewports()
|
|
self._should_reset = False
|
|
self.sim_frame = 0
|
|
self.on_reset()
|
|
return obs
|
|
|
|
def on_reset(self):
|
|
"""Hook for subclasses to reset their own state."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def assemble_action(self, obs) -> torch.Tensor:
|
|
"""Build the full action tensor for env.step(). Must be implemented by subclass."""
|
|
...
|
|
|
|
def post_step(self, obs):
|
|
"""Hook called after env.step(). Subclasses can add streaming, diagnostics, etc."""
|
|
pass
|
|
|
|
def run(self):
|
|
"""Main simulation loop."""
|
|
obs, _ = self.env.reset()
|
|
self._clear_ik_target_state()
|
|
self.on_reset()
|
|
|
|
self._print_banner()
|
|
|
|
while self.simulation_app.is_running():
|
|
try:
|
|
with torch.inference_mode():
|
|
if self._should_reset:
|
|
obs = self._do_reset(obs)
|
|
continue
|
|
|
|
action = self.assemble_action(obs)
|
|
actions = action.unsqueeze(0).repeat(self.num_envs, 1).to(self.device)
|
|
obs, _, _, _, _ = self.env.step(actions)
|
|
|
|
self.sim_frame += 1
|
|
self.post_step(obs)
|
|
except Exception as e:
|
|
logger.error(f"Error during simulation step: {e}")
|
|
break
|
|
|
|
self.cleanup()
|
|
|
|
def _print_banner(self):
|
|
print("\n" + "=" * 50)
|
|
print(" Teleoperation Started!")
|
|
print("=" * 50 + "\n")
|
|
|
|
def cleanup(self):
|
|
"""Release resources. Override in subclass to close controllers, streamers, etc."""
|
|
self.env.close()
|