Files
mindbot/scripts/environments/teleoperation/xr_teleop/dual_arm_head_agent.py
yt lee eef7ff838d Fix PrismaticJoint (trunk) control: IdealPDActuator with high stiffness
Replace ImplicitActuator (which doesn't work for prismatic joints) with
IdealPDActuator. This bypasses PhysX drive and explicitly computes PD forces
using set_dof_actuation_forces().

Changes:
- stiffness=100k, damping=5k to resist arm reaction forces (~150N) with <2mm deflection
- Read trunk_cmd from ArticulationCfg initial position (single source of truth)
- Integrate trunk diagnostics into robot state output
- Remove hardcoded trunk_target parameter
2026-03-25 16:32:52 +08:00

92 lines
3.1 KiB
Python

# Copyright (c) 2025, Yutang Li, SIAT (yt.li2@siat.ac.cn)
# SPDX-License-Identifier: BSD-3-Clause
"""Dual-arm + head tracking + trunk + stereo streaming XR agent."""
from __future__ import annotations
import torch
from .dual_arm_agent import DualArmXrAgent
from .head_tracker import HeadTracker
from .streaming import StreamingManager
from .diagnostics import DiagnosticsReporter
class DualArmHeadXrAgent(DualArmXrAgent):
"""Extends DualArmXrAgent with head tracking, trunk hold, and VR stereo streaming.
Action: left_arm(7) | wheel(4) | left_grip(1) | right_arm(7) | right_grip(1) | head(2) | trunk(1) = 23D
"""
def __init__(self, env, simulation_app, *,
pos_sensitivity: float = 1.0,
rot_sensitivity: float = 0.3,
base_speed: float = 5.0,
base_turn: float = 2.0,
drive_speed: float = 0.5,
drive_turn: float = 1.5,
stream_to: str | None = None,
stream_port: int = 12345,
stream_bitrate: int = 20_000_000,
debug_viewports: bool = True):
super().__init__(
env, simulation_app,
pos_sensitivity=pos_sensitivity, rot_sensitivity=rot_sensitivity,
base_speed=base_speed, base_turn=base_turn,
drive_speed=drive_speed, drive_turn=drive_turn,
debug_viewports=debug_viewports,
)
self.head_tracker = HeadTracker()
# Read trunk initial position from ArticulationCfg (single source of truth)
robot = env.unwrapped.scene["robot"]
joint_names = robot.data.joint_names
trunk_idx = joint_names.index("PrismaticJoint")
trunk_init = robot.data.default_joint_pos[0, trunk_idx].item()
self.trunk_cmd = torch.tensor([trunk_init], dtype=torch.float32)
# Streaming
self.streamer: StreamingManager | None = None
if stream_to:
scene = env.unwrapped.scene
self.streamer = StreamingManager(stream_to, stream_port, scene, bitrate=stream_bitrate)
# Diagnostics
self.diagnostics = DiagnosticsReporter(interval=30, is_dual_arm=True)
def on_reset(self):
super().on_reset()
self.head_tracker.reset()
def assemble_action(self, obs) -> torch.Tensor:
base_action = super().assemble_action(obs)
# Head tracking
head_targets = self.head_tracker.get_targets(self.shared_client)
head_cmd = torch.tensor(head_targets, dtype=torch.float32)
# base(20) | head(2) | trunk(1)
return torch.cat([base_action, head_cmd, self.trunk_cmd])
def post_step(self, obs):
super().post_step(obs)
# Stereo streaming
if self.streamer is not None:
scene = self.env.unwrapped.scene
self.streamer.send(scene, self.sim_frame)
# Diagnostics
self.diagnostics.report(
self.env, obs, self.sim_frame,
xr_client=self.shared_client,
trunk_target=self.trunk_cmd[0].item(),
)
def cleanup(self):
if self.streamer is not None:
self.streamer.close()
super().cleanup()