# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn) # SPDX-License-Identifier: BSD-3-Clause """Base teleoperation agent with main loop, env management, and reset handling.""" from __future__ import annotations import logging from abc import ABC, abstractmethod import torch logger = logging.getLogger(__name__) class BaseTeleopAgent(ABC): """Abstract base for XR teleoperation agents. Provides the main simulation loop, env lifecycle, and reset handling. Subclasses implement assemble_action() to define their control dimensions. """ def __init__(self, env, simulation_app, *, debug_viewports: bool = True): self.env = env self.simulation_app = simulation_app self.device = env.unwrapped.device self.num_envs = env.num_envs self._should_reset = False self.sim_frame = 0 # Viewport management self._robot_viewports: dict[str, object] = {} self._robot_cam_prims: dict[str, str] = { "Left Hand": "/World/envs/env_0/Robot/rm_65_fb_left/Link_6/robot___left_8_02/cam_left_hand", "Right Hand": "/World/envs/env_0/Robot/rm_65_b_right/Link6/robot___right_8_2/cam_right_hand", "Head": "/World/envs/env_0/Robot/robot_head/ZED_X/base_link/ZED_X/CameraLeft", "Chest": "/World/envs/env_0/Robot/robot_trunk/cam_chest", } if debug_viewports: self._create_viewports() def request_reset(self): self._should_reset = True print("[INFO] Reset requested via XR button.") def _create_viewports(self): try: import omni.kit.viewport.utility as vp_util except ImportError: logger.warning("[Viewport] omni.kit.viewport.utility not available.") return for name, cam_path in self._robot_cam_prims.items(): vp_win = vp_util.create_viewport_window(f"Robot {name} View", width=640, height=360) vp_win.viewport_api.camera_path = cam_path self._robot_viewports[name] = vp_win print(f"[INFO] Viewport 'Robot {name} View' bound to: {cam_path}") def _rebind_viewports(self): for name, vp_win in self._robot_viewports.items(): vp_win.viewport_api.camera_path = self._robot_cam_prims[name] def _clear_ik_target_state(self): """Sync IK controller internals to current EEF pose to prevent jumps after reset.""" action_terms = self.env.action_manager._terms for key in self._ik_action_term_names(): if key in action_terms: term = action_terms[key] ee_pos_b, ee_quat_b = term._compute_frame_pose() term._raw_actions.zero_() term._processed_actions.zero_() term._ik_controller._command.zero_() term._ik_controller.ee_pos_des[:] = ee_pos_b term._ik_controller.ee_quat_des[:] = ee_quat_b def _ik_action_term_names(self) -> list[str]: """Return IK action term names to clear on reset. Override in subclass.""" return ["arm_action"] def _do_reset(self, obs): """Execute reset sequence. Returns new obs.""" obs, _ = self.env.reset() self._clear_ik_target_state() self._rebind_viewports() self._should_reset = False self.sim_frame = 0 self.on_reset() return obs def on_reset(self): """Hook for subclasses to reset their own state.""" pass @abstractmethod def assemble_action(self, obs) -> torch.Tensor: """Build the full action tensor for env.step(). Must be implemented by subclass.""" ... def post_step(self, obs): """Hook called after env.step(). Subclasses can add streaming, diagnostics, etc.""" pass def run(self): """Main simulation loop.""" obs, _ = self.env.reset() self._clear_ik_target_state() self.on_reset() self._print_banner() while self.simulation_app.is_running(): try: with torch.inference_mode(): if self._should_reset: obs = self._do_reset(obs) continue action = self.assemble_action(obs) actions = action.unsqueeze(0).repeat(self.num_envs, 1).to(self.device) obs, _, _, _, _ = self.env.step(actions) self.sim_frame += 1 self.post_step(obs) except Exception as e: logger.error(f"Error during simulation step: {e}") break self.cleanup() def _print_banner(self): print("\n" + "=" * 50) print(" Teleoperation Started!") print("=" * 50 + "\n") def cleanup(self): """Release resources. Override in subclass to close controllers, streamers, etc.""" self.env.close()