# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn) # SPDX-License-Identifier: BSD-3-Clause """Periodic diagnostics reporter for teleoperation debugging.""" import numpy as np class DiagnosticsReporter: """Prints robot state info every N frames during teleoperation.""" def __init__(self, interval: int = 30, is_dual_arm: bool = False): self.interval = interval self.is_dual_arm = is_dual_arm self._left_arm_idx: list[int] | None = None self._right_arm_idx: list[int] | None = None self._joint_names_printed = False def _ensure_indices(self, robot): if self._left_arm_idx is None: jnames = robot.joint_names self._left_arm_idx = [i for i, n in enumerate(jnames) if n.startswith("l_joint")] self._right_arm_idx = [i for i, n in enumerate(jnames) if n.startswith("r_joint")] self._trunk_idx = jnames.index("PrismaticJoint") if "PrismaticJoint" in jnames else None def report(self, env, obs, frame: int, last_act: np.ndarray | None = None, xr_client=None, read_gripper_contact=None, trunk_target: float | None = None): """Print diagnostics if frame is on interval boundary.""" if frame % self.interval != 0: return np.set_printoptions(precision=4, suppress=True, floatmode='fixed') policy_obs = obs["policy"] joint_pos = policy_obs["joint_pos"][0].cpu().numpy() if last_act is None: last_act = policy_obs["actions"][0].cpu().numpy() robot = env.unwrapped.scene["robot"] self._ensure_indices(robot) # Print full joint table on first report if not self._joint_names_printed: jnames = robot.joint_names print(f"\n{'=' * 70}") print(f" ALL {len(jnames)} JOINT NAMES AND POSITIONS") print(f"{'=' * 70}") for i, name in enumerate(jnames): print(f" [{i:2d}] {name:30s} = {joint_pos[i]:+.4f}") print(f" Deduced left arm indices: {self._left_arm_idx}") print(f"{'=' * 70}\n") self._joint_names_printed = True arm_joints = joint_pos[self._left_arm_idx] right_arm_joints = joint_pos[self._right_arm_idx] joy_str = "N/A" if xr_client is not None: try: joy_dbg = xr_client.get_joystick("left") joy_str = f"[{joy_dbg[0]:+.2f}, {joy_dbg[1]:+.2f}]" except Exception: pass if self.is_dual_arm: eef_pos_left = policy_obs["eef_pos_left"][0].cpu().numpy() eef_pos_right = policy_obs["eef_pos_right"][0].cpu().numpy() contact_str = read_gripper_contact() if read_gripper_contact else "N/A" print(f"\n---------------- [ROBOT STATE frame={frame}] ----------------") print(f"| Left Arm Joints (rad): {arm_joints}") print(f"| Right Arm Joints (rad): {right_arm_joints}") print(f"| Left EEF Pos (world, m): {eef_pos_left}") print(f"| Right EEF Pos (world, m): {eef_pos_right}") print(f"| Left Gripper Contact: {contact_str}") print(f"| Cmd left_arm(abs): {last_act[:7]}") print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}") print(f"| Cmd left_gripper: {last_act[11:12]} (+1=open -1=close)") print(f"| Cmd right_arm(abs): {last_act[12:19]}") print(f"| Cmd right_gripper: {last_act[19:]} (+1=open -1=close)") print(f"| Joystick (x=turn,y=fwd): {joy_str}") if self._trunk_idx is not None: trunk_pos = robot.data.joint_pos[0, self._trunk_idx].item() trunk_vel = robot.data.joint_vel[0, self._trunk_idx].item() trunk_force = robot.data.applied_torque[0, self._trunk_idx].item() if hasattr(robot.data, 'applied_torque') else float('nan') trunk_tgt_str = f"{trunk_target:.4f}" if trunk_target is not None else "N/A" print(f"| Trunk pos={trunk_pos:.4f} vel={trunk_vel:.6f} target={trunk_tgt_str} force={trunk_force:.2f}") print(f"----------------------------------------------------------------") else: eef_pos = policy_obs["eef_pos"][0].cpu().numpy() print(f"\n---------------- [ROBOT STATE frame={frame}] ----------------") print(f"| Left Arm Joints (rad): {arm_joints}") print(f"| EEF Pos (world, m): {eef_pos}") print(f"| Cmd arm(abs pos+quat): {last_act[:7]}") print(f"| Cmd wheel vel (rad/s): {last_act[7:11]}") print(f"| Cmd gripper: {last_act[11:]} (+1=open -1=close)") print(f"| Joystick (x=turn,y=fwd): {joy_str}") print(f"----------------------------------------------------------------")