Add keyboard teleop device to control the end effector robot (#1289)

This commit is contained in:
Michel Aractingi
2025-06-14 09:10:09 +02:00
committed by GitHub
parent 35e67585bf
commit 6007a221f0
5 changed files with 215 additions and 15 deletions

View File

@@ -1,4 +1,9 @@
from .configuration_keyboard import KeyboardTeleopConfig
from .teleop_keyboard import KeyboardTeleop
from .configuration_keyboard import KeyboardEndEffectorTeleopConfig, KeyboardTeleopConfig
from .teleop_keyboard import KeyboardEndEffectorTeleop, KeyboardTeleop
__all__ = ["KeyboardTeleopConfig", "KeyboardTeleop"]
__all__ = [
"KeyboardTeleopConfig",
"KeyboardTeleop",
"KeyboardEndEffectorTeleopConfig",
"KeyboardEndEffectorTeleop",
]

View File

@@ -24,3 +24,9 @@ from ..config import TeleoperatorConfig
class KeyboardTeleopConfig(TeleoperatorConfig):
# TODO(Steven): Consider setting in here the keys that we want to capture/listen
mock: bool = False
@TeleoperatorConfig.register_subclass("keyboard_ee")
@dataclass
class KeyboardEndEffectorTeleopConfig(KeyboardTeleopConfig):
use_gripper: bool = True

View File

@@ -24,7 +24,7 @@ from typing import Any
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .configuration_keyboard import KeyboardTeleopConfig
from .configuration_keyboard import KeyboardEndEffectorTeleopConfig, KeyboardTeleopConfig
PYNPUT_AVAILABLE = True
try:
@@ -145,3 +145,93 @@ class KeyboardTeleop(Teleoperator):
)
if self.listener is not None:
self.listener.stop()
class KeyboardEndEffectorTeleop(KeyboardTeleop):
"""
Teleop class to use keyboard inputs for end effector control.
Designed to be used with the `So100FollowerEndEffector` robot.
"""
config_class = KeyboardEndEffectorTeleopConfig
name = "keyboard_ee"
def __init__(self, config: KeyboardEndEffectorTeleopConfig):
super().__init__(config)
self.config = config
self.misc_keys_queue = Queue()
@property
def action_features(self) -> dict:
if self.config.use_gripper:
return {
"dtype": "float32",
"shape": (4,),
"names": {"delta_x": 0, "delta_y": 1, "delta_z": 2, "gripper": 3},
}
else:
return {
"dtype": "float32",
"shape": (3,),
"names": {"delta_x": 0, "delta_y": 1, "delta_z": 2},
}
def _on_press(self, key):
if hasattr(key, "char"):
key = key.char
self.event_queue.put((key, True))
def _on_release(self, key):
if hasattr(key, "char"):
key = key.char
self.event_queue.put((key, False))
def get_action(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)
self._drain_pressed_keys()
delta_x = 0.0
delta_y = 0.0
delta_z = 0.0
# Generate action based on current key states
for key, val in self.current_pressed.items():
if key == keyboard.Key.up:
delta_x = int(val)
elif key == keyboard.Key.down:
delta_x = -int(val)
elif key == keyboard.Key.left:
delta_y = int(val)
elif key == keyboard.Key.right:
delta_y = -int(val)
elif key == keyboard.Key.shift:
delta_z = -int(val)
elif key == keyboard.Key.shift_r:
delta_z = int(val)
elif key == keyboard.Key.ctrl_r:
# Gripper actions are expected to be between 0 (close), 1 (stay), 2 (open)
gripper_action = int(val) + 1
elif key == keyboard.Key.ctrl_l:
gripper_action = int(val) - 1
elif val:
# If the key is pressed, add it to the misc_keys_queue
# this will record key presses that are not part of the delta_x, delta_y, delta_z
# this is useful for retrieving other events like interventions for RL, episode success, etc.
self.misc_keys_queue.put(key)
self.current_pressed.clear()
action_dict = {
"delta_x": delta_x,
"delta_y": delta_y,
"delta_z": delta_z,
}
gripper_action = 1 # default gripper action is to stay
if self.config.use_gripper:
action_dict["gripper"] = gripper_action
return action_dict

View File

@@ -49,5 +49,9 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .gamepad.teleop_gamepad import GamepadTeleop
return GamepadTeleop(config)
elif config.type == "keyboard_ee":
from .keyboard.teleop_keyboard import KeyboardEndEffectorTeleop
return KeyboardEndEffectorTeleop(config)
else:
raise ValueError(config.type)

View File

@@ -58,10 +58,12 @@ from lerobot.common.robots import ( # noqa: F401
)
from lerobot.common.teleoperators import (
gamepad, # noqa: F401
keyboard, # noqa: F401
make_teleoperator_from_config,
so101_leader, # noqa: F401
)
from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop
from lerobot.common.teleoperators.keyboard.teleop_keyboard import KeyboardEndEffectorTeleop
from lerobot.common.utils.robot_utils import busy_wait
from lerobot.common.utils.utils import log_say
from lerobot.configs import parser
@@ -1191,7 +1193,7 @@ class BaseLeaderControlWrapper(gym.Wrapper):
"rerecord_episode": False,
}
def _handle_key_press(self, key, keyboard):
def _handle_key_press(self, key, keyboard_device):
"""
Handle key press events.
@@ -1202,10 +1204,10 @@ class BaseLeaderControlWrapper(gym.Wrapper):
This method should be overridden in subclasses for additional key handling.
"""
try:
if key == keyboard.Key.esc:
if key == keyboard_device.Key.esc:
self.keyboard_events["episode_end"] = True
return
if key == keyboard.Key.left:
if key == keyboard_device.Key.left:
self.keyboard_events["rerecord_episode"] = True
return
if hasattr(key, "char") and key.char == "s":
@@ -1221,13 +1223,13 @@ class BaseLeaderControlWrapper(gym.Wrapper):
This method sets up keyboard event handling if not in headless mode.
"""
from pynput import keyboard
from pynput import keyboard as keyboard_device
def on_press(key):
with self.event_lock:
self._handle_key_press(key, keyboard)
self._handle_key_press(key, keyboard_device)
self.listener = keyboard.Listener(on_press=on_press)
self.listener = keyboard_device.Listener(on_press=on_press)
self.listener.start()
def _check_intervention(self):
@@ -1403,7 +1405,7 @@ class GearedLeaderControlWrapper(BaseLeaderControlWrapper):
super()._init_keyboard_events()
self.keyboard_events["human_intervention_step"] = False
def _handle_key_press(self, key, keyboard):
def _handle_key_press(self, key, keyboard_device):
"""
Handle key presses including space for intervention toggle.
@@ -1413,8 +1415,8 @@ class GearedLeaderControlWrapper(BaseLeaderControlWrapper):
Extends the base handler to respond to space key for toggling intervention.
"""
super()._handle_key_press(key, keyboard)
if key == keyboard.Key.space:
super()._handle_key_press(key, keyboard_device)
if key == keyboard_device.Key.space:
if not self.keyboard_events["human_intervention_step"]:
logging.info(
"Space key pressed. Human intervention required.\n"
@@ -1574,7 +1576,7 @@ class GamepadControlWrapper(gym.Wrapper):
print(" Y/Triangle button: End episode (SUCCESS)")
print(" B/Circle button: Exit program")
def get_gamepad_action(
def get_teleop_commands(
self,
) -> tuple[bool, np.ndarray, bool, bool, bool]:
"""
@@ -1643,7 +1645,7 @@ class GamepadControlWrapper(gym.Wrapper):
terminate_episode,
success,
rerecord_episode,
) = self.get_gamepad_action()
) = self.get_teleop_commands()
# Update episode ending state if requested
if terminate_episode:
@@ -1700,6 +1702,90 @@ class GamepadControlWrapper(gym.Wrapper):
return self.env.close()
class KeyboardControlWrapper(GamepadControlWrapper):
"""
Wrapper that allows controlling a gym environment with a keyboard.
This wrapper intercepts the step method and allows human input via keyboard
to override the agent's actions when desired.
Inherits from GamepadControlWrapper to avoid code duplication.
"""
def __init__(
self,
env,
teleop_device, # Accepts an instantiated teleoperator
use_gripper=False, # This should align with teleop_device's config
auto_reset=False,
):
"""
Initialize the gamepad controller wrapper.
Args:
env: The environment to wrap.
teleop_device: The instantiated teleoperation device (e.g., GamepadTeleop).
use_gripper: Whether to include gripper control (should match teleop_device.config.use_gripper).
auto_reset: Whether to auto reset the environment when episode ends.
"""
super().__init__(env, teleop_device, use_gripper, auto_reset)
self.is_intervention_active = False
logging.info("Keyboard control wrapper initialized with provided teleop_device.")
print("Keyboard controls:")
print(" Arrow keys: Move in X-Y plane")
print(" Shift and Shift_R: Move in Z axis")
print(" Right Ctrl and Left Ctrl: Open and close gripper")
print(" f: End episode with FAILURE")
print(" s: End episode with SUCCESS")
print(" r: End episode with RERECORD")
print(" i: Start/Stop Intervention")
def get_teleop_commands(
self,
) -> tuple[bool, np.ndarray, bool, bool, bool]:
action_dict = self.teleop_device.get_action()
episode_end_status = None
# Unroll the misc_keys_queue to check for events related to intervention, episode success, etc.
while not self.teleop_device.misc_keys_queue.empty():
key = self.teleop_device.misc_keys_queue.get()
if key == "i":
self.is_intervention_active = not self.is_intervention_active
elif key == "f":
episode_end_status = "failure"
elif key == "s":
episode_end_status = "success"
elif key == "r":
episode_end_status = "rerecord_episode"
terminate_episode = episode_end_status is not None
success = episode_end_status == "success"
rerecord_episode = episode_end_status == "rerecord_episode"
# Convert action_dict to numpy array based on expected structure
# Order: delta_x, delta_y, delta_z, gripper (if use_gripper)
action_list = [action_dict["delta_x"], action_dict["delta_y"], action_dict["delta_z"]]
if self.use_gripper:
# GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open)
# This needs to be consistent with what EEActionWrapper expects if it's used downstream
# EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open)
# For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility.
gripper_val = action_dict.get("gripper", 1.0) # Default to 1.0 (stay) if not present
action_list.append(float(gripper_val))
gamepad_action_np = np.array(action_list, dtype=np.float32)
return (
self.is_intervention_active,
gamepad_action_np,
terminate_episode,
success,
rerecord_episode,
)
class GymHilDeviceWrapper(gym.Wrapper):
def __init__(self, env, device="cpu"):
super().__init__(env)
@@ -1843,6 +1929,15 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "keyboard_ee":
assert isinstance(teleop_device, KeyboardEndEffectorTeleop), (
"teleop_device must be an instance of KeyboardEndEffectorTeleop for keyboard control mode"
)
env = KeyboardControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,