204 lines
8.0 KiB
Python
204 lines
8.0 KiB
Python
# Copyright (c) 2022-2026, The Isaac Lab Project Developers.
|
||
# SPDX-License-Identifier: BSD-3-Clause
|
||
|
||
"""Script to run teleoperation for MindRobot with chassis (wheel) control.
|
||
|
||
Extends teleop_se3_agent.py with differential-drive keyboard control:
|
||
↑ / ↓ — chassis forward / backward
|
||
← / → — chassis left turn / right turn (in-place, skid-steer)
|
||
W/S/A/D/Q/E — left arm IK: translate X/Y/Z
|
||
Z/X T/G C/V — left arm IK: rotate roll/pitch/yaw
|
||
K — left gripper toggle open/close
|
||
R — reset environment
|
||
"""
|
||
|
||
import argparse
|
||
import weakref
|
||
import numpy as np
|
||
import torch
|
||
|
||
from isaaclab.app import AppLauncher
|
||
|
||
parser = argparse.ArgumentParser(description="MindRobot teleoperation with wheel control.")
|
||
parser.add_argument("--num_envs", type=int, default=1)
|
||
parser.add_argument("--task", type=str, default=None)
|
||
parser.add_argument("--sensitivity", type=float, default=1.0)
|
||
parser.add_argument("--wheel_speed", type=float, default=5.0,
|
||
help="Wheel velocity in rad/s (default: 5.0)")
|
||
AppLauncher.add_app_launcher_args(parser)
|
||
args_cli = parser.parse_args()
|
||
|
||
app_launcher = AppLauncher(args_cli)
|
||
simulation_app = app_launcher.app
|
||
|
||
# ---------- Rest everything follows ----------
|
||
|
||
import logging
|
||
import gymnasium as gym
|
||
import carb
|
||
|
||
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg
|
||
from isaaclab.envs import ManagerBasedRLEnvCfg
|
||
from isaaclab.managers import TerminationTermCfg as DoneTerm
|
||
|
||
import mindbot.tasks # noqa: F401
|
||
from isaaclab_tasks.utils import parse_env_cfg
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# WheelKeyboard: 监听方向键,输出 4 维轮速 tensor
|
||
# ─────────────────────────────────────────────────────────────
|
||
class WheelKeyboard:
|
||
"""Differential-drive (skid-steer) keyboard controller.
|
||
|
||
Listens to arrow keys via Carb and produces a 4-D joint-velocity
|
||
command: [right_b, left_b, left_f, right_f] (rad/s).
|
||
|
||
Key mappings
|
||
─────────────────────────────────────────────────────
|
||
↑ (UP) forward [ v, v, v, v]
|
||
↓ (DOWN) backward [-v, -v, -v, -v]
|
||
← (LEFT) left turn [ v, -v, -v, v] right wheels fwd
|
||
→ (RIGHT) right turn[-v, v, v, -v] left wheels fwd
|
||
─────────────────────────────────────────────────────
|
||
"""
|
||
|
||
_WHEEL_KEYS = {"UP", "DOWN", "LEFT", "RIGHT"}
|
||
|
||
def __init__(self, wheel_speed: float = 5.0, sim_device: str = "cuda:0"):
|
||
self.wheel_speed = wheel_speed
|
||
self._sim_device = sim_device
|
||
# 4 维速度缓冲:[right_b, left_b, left_f, right_f]
|
||
self._wheel_vel = np.zeros(4)
|
||
# 按键 → 速度向量映射 (在 _create_bindings 中填充)
|
||
self._key_map: dict[str, np.ndarray] = {}
|
||
self._create_bindings()
|
||
|
||
# 订阅 Carb 键盘事件
|
||
import omni
|
||
self._appwindow = omni.appwindow.get_default_app_window()
|
||
self._input = carb.input.acquire_input_interface()
|
||
self._keyboard = self._appwindow.get_keyboard()
|
||
self._keyboard_sub = self._input.subscribe_to_keyboard_events(
|
||
self._keyboard,
|
||
lambda event, *args, obj=weakref.proxy(self): obj._on_keyboard_event(event, *args),
|
||
)
|
||
|
||
def __del__(self):
|
||
self._input.unsubscribe_to_keyboard_events(self._keyboard, self._keyboard_sub)
|
||
|
||
def __str__(self) -> str:
|
||
return (
|
||
"WheelKeyboard (skid-steer chassis controller)\n"
|
||
"\t↑ UP — forward\n"
|
||
"\t↓ DOWN — backward\n"
|
||
"\t← LEFT — left turn (in-place)\n"
|
||
"\t→ RIGHT — right turn (in-place)"
|
||
)
|
||
|
||
def reset(self):
|
||
self._wheel_vel = np.zeros(4)
|
||
|
||
def advance(self) -> torch.Tensor:
|
||
"""Returns 4-D wheel velocity tensor [right_b, left_b, left_f, right_f]."""
|
||
return torch.tensor(self._wheel_vel.copy(), dtype=torch.float32, device=self._sim_device)
|
||
|
||
def _create_bindings(self):
|
||
v = self.wheel_speed
|
||
# [right_b, left_b, left_f, right_f]
|
||
self._key_map = {
|
||
"UP": np.array([ v, v, v, v]), # 前进
|
||
"DOWN": np.array([-v, -v, -v, -v]), # 后退
|
||
"LEFT": np.array([ v, -v, -v, v]), # 左转
|
||
"RIGHT": np.array([-v, v, v, -v]), # 右转
|
||
}
|
||
|
||
def _on_keyboard_event(self, event, *args, **kwargs):
|
||
if event.type == carb.input.KeyboardEventType.KEY_PRESS:
|
||
key = event.input.name
|
||
if key in self._key_map:
|
||
self._wheel_vel += self._key_map[key]
|
||
if event.type == carb.input.KeyboardEventType.KEY_RELEASE:
|
||
key = event.input.name
|
||
if key in self._key_map:
|
||
self._wheel_vel -= self._key_map[key]
|
||
return True
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# main
|
||
# ─────────────────────────────────────────────────────────────
|
||
def main() -> None:
|
||
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs)
|
||
env_cfg.env_name = args_cli.task
|
||
if not isinstance(env_cfg, ManagerBasedRLEnvCfg):
|
||
raise ValueError(f"Unsupported env config type: {type(env_cfg).__name__}")
|
||
env_cfg.terminations.time_out = None
|
||
|
||
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
|
||
|
||
should_reset = False
|
||
|
||
def on_reset():
|
||
nonlocal should_reset
|
||
should_reset = True
|
||
print("[teleop] Reset triggered.")
|
||
|
||
# ── 机械臂 IK 控制器(Se3Keyboard,7 维输出)──
|
||
arm_keyboard = Se3Keyboard(
|
||
Se3KeyboardCfg(
|
||
pos_sensitivity=0.05 * args_cli.sensitivity,
|
||
rot_sensitivity=0.05 * args_cli.sensitivity,
|
||
sim_device=args_cli.device,
|
||
)
|
||
)
|
||
arm_keyboard.add_callback("R", on_reset)
|
||
|
||
# ── 底盘轮速控制器(WheelKeyboard,4 维输出)──
|
||
wheel_keyboard = WheelKeyboard(
|
||
wheel_speed=args_cli.wheel_speed,
|
||
sim_device=args_cli.device,
|
||
)
|
||
|
||
print(arm_keyboard)
|
||
print(wheel_keyboard)
|
||
print("\nAction layout: [arm_IK(6) | wheel_vel(4) | gripper(1)] = 11 dims")
|
||
print("Press R to reset. Press Ctrl+C to exit.\n")
|
||
|
||
env.reset()
|
||
arm_keyboard.reset()
|
||
wheel_keyboard.reset()
|
||
|
||
while simulation_app.is_running():
|
||
with torch.inference_mode():
|
||
# ── 1. 获取机械臂 delta pose + gripper (7维) ──
|
||
arm_cmd = arm_keyboard.advance() # [dx,dy,dz,rx,ry,rz, gripper]
|
||
arm_ik = arm_cmd[:6] # (6,)
|
||
gripper = arm_cmd[6:7] # (1,)
|
||
|
||
# ── 2. 获取底盘轮速 (4维) ──
|
||
wheel_vel = wheel_keyboard.advance() # [right_b, left_b, left_f, right_f]
|
||
|
||
# ── 3. 拼接完整 action (11维) ──
|
||
# 顺序与 MindRobotTeleopActionsCfg 中的声明顺序一致:
|
||
# arm_action(6) → wheel_action(4) → gripper_action(1)
|
||
action = torch.cat([arm_ik, wheel_vel, gripper]) # (11,)
|
||
actions = action.unsqueeze(0).repeat(env.num_envs, 1)
|
||
|
||
env.step(actions)
|
||
|
||
if should_reset:
|
||
env.reset()
|
||
arm_keyboard.reset()
|
||
wheel_keyboard.reset()
|
||
should_reset = False
|
||
print("[teleop] Environment reset complete.")
|
||
|
||
env.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
simulation_app.close() |