From 6007a221f0d5e425e3c212e079688fa80f96a268 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Sat, 14 Jun 2025 09:10:09 +0200 Subject: [PATCH] Add keyboard teleop device to control the end effector robot (#1289) --- .../common/teleoperators/keyboard/__init__.py | 11 +- .../keyboard/configuration_keyboard.py | 6 + .../teleoperators/keyboard/teleop_keyboard.py | 92 +++++++++++++- lerobot/common/teleoperators/utils.py | 4 + lerobot/scripts/rl/gym_manipulator.py | 117 ++++++++++++++++-- 5 files changed, 215 insertions(+), 15 deletions(-) diff --git a/lerobot/common/teleoperators/keyboard/__init__.py b/lerobot/common/teleoperators/keyboard/__init__.py index 9d27a34d6..5761bf788 100644 --- a/lerobot/common/teleoperators/keyboard/__init__.py +++ b/lerobot/common/teleoperators/keyboard/__init__.py @@ -1,4 +1,9 @@ -from .configuration_keyboard import KeyboardTeleopConfig -from .teleop_keyboard import KeyboardTeleop +from .configuration_keyboard import KeyboardEndEffectorTeleopConfig, KeyboardTeleopConfig +from .teleop_keyboard import KeyboardEndEffectorTeleop, KeyboardTeleop -__all__ = ["KeyboardTeleopConfig", "KeyboardTeleop"] +__all__ = [ + "KeyboardTeleopConfig", + "KeyboardTeleop", + "KeyboardEndEffectorTeleopConfig", + "KeyboardEndEffectorTeleop", +] diff --git a/lerobot/common/teleoperators/keyboard/configuration_keyboard.py b/lerobot/common/teleoperators/keyboard/configuration_keyboard.py index ce6c9206e..5d5ef364f 100644 --- a/lerobot/common/teleoperators/keyboard/configuration_keyboard.py +++ b/lerobot/common/teleoperators/keyboard/configuration_keyboard.py @@ -24,3 +24,9 @@ from ..config import TeleoperatorConfig class KeyboardTeleopConfig(TeleoperatorConfig): # TODO(Steven): Consider setting in here the keys that we want to capture/listen mock: bool = False + + +@TeleoperatorConfig.register_subclass("keyboard_ee") +@dataclass +class KeyboardEndEffectorTeleopConfig(KeyboardTeleopConfig): + use_gripper: bool = True diff --git a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py index a72710e9d..bd3ab903e 100644 --- a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py +++ b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py @@ -24,7 +24,7 @@ from typing import Any from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from ..teleoperator import Teleoperator -from .configuration_keyboard import KeyboardTeleopConfig +from .configuration_keyboard import KeyboardEndEffectorTeleopConfig, KeyboardTeleopConfig PYNPUT_AVAILABLE = True try: @@ -145,3 +145,93 @@ class KeyboardTeleop(Teleoperator): ) if self.listener is not None: self.listener.stop() + + +class KeyboardEndEffectorTeleop(KeyboardTeleop): + """ + Teleop class to use keyboard inputs for end effector control. + Designed to be used with the `So100FollowerEndEffector` robot. + """ + + config_class = KeyboardEndEffectorTeleopConfig + name = "keyboard_ee" + + def __init__(self, config: KeyboardEndEffectorTeleopConfig): + super().__init__(config) + self.config = config + self.misc_keys_queue = Queue() + + @property + def action_features(self) -> dict: + if self.config.use_gripper: + return { + "dtype": "float32", + "shape": (4,), + "names": {"delta_x": 0, "delta_y": 1, "delta_z": 2, "gripper": 3}, + } + else: + return { + "dtype": "float32", + "shape": (3,), + "names": {"delta_x": 0, "delta_y": 1, "delta_z": 2}, + } + + def _on_press(self, key): + if hasattr(key, "char"): + key = key.char + self.event_queue.put((key, True)) + + def _on_release(self, key): + if hasattr(key, "char"): + key = key.char + self.event_queue.put((key, False)) + + def get_action(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError( + "KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`." + ) + + self._drain_pressed_keys() + delta_x = 0.0 + delta_y = 0.0 + delta_z = 0.0 + + # Generate action based on current key states + for key, val in self.current_pressed.items(): + if key == keyboard.Key.up: + delta_x = int(val) + elif key == keyboard.Key.down: + delta_x = -int(val) + elif key == keyboard.Key.left: + delta_y = int(val) + elif key == keyboard.Key.right: + delta_y = -int(val) + elif key == keyboard.Key.shift: + delta_z = -int(val) + elif key == keyboard.Key.shift_r: + delta_z = int(val) + elif key == keyboard.Key.ctrl_r: + # Gripper actions are expected to be between 0 (close), 1 (stay), 2 (open) + gripper_action = int(val) + 1 + elif key == keyboard.Key.ctrl_l: + gripper_action = int(val) - 1 + elif val: + # If the key is pressed, add it to the misc_keys_queue + # this will record key presses that are not part of the delta_x, delta_y, delta_z + # this is useful for retrieving other events like interventions for RL, episode success, etc. + self.misc_keys_queue.put(key) + + self.current_pressed.clear() + + action_dict = { + "delta_x": delta_x, + "delta_y": delta_y, + "delta_z": delta_z, + } + + gripper_action = 1 # default gripper action is to stay + if self.config.use_gripper: + action_dict["gripper"] = gripper_action + + return action_dict diff --git a/lerobot/common/teleoperators/utils.py b/lerobot/common/teleoperators/utils.py index d7b7bcf0e..b49addc15 100644 --- a/lerobot/common/teleoperators/utils.py +++ b/lerobot/common/teleoperators/utils.py @@ -49,5 +49,9 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: from .gamepad.teleop_gamepad import GamepadTeleop return GamepadTeleop(config) + elif config.type == "keyboard_ee": + from .keyboard.teleop_keyboard import KeyboardEndEffectorTeleop + + return KeyboardEndEffectorTeleop(config) else: raise ValueError(config.type) diff --git a/lerobot/scripts/rl/gym_manipulator.py b/lerobot/scripts/rl/gym_manipulator.py index 98445e666..3d2a62777 100644 --- a/lerobot/scripts/rl/gym_manipulator.py +++ b/lerobot/scripts/rl/gym_manipulator.py @@ -58,10 +58,12 @@ from lerobot.common.robots import ( # noqa: F401 ) from lerobot.common.teleoperators import ( gamepad, # noqa: F401 + keyboard, # noqa: F401 make_teleoperator_from_config, so101_leader, # noqa: F401 ) from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop +from lerobot.common.teleoperators.keyboard.teleop_keyboard import KeyboardEndEffectorTeleop from lerobot.common.utils.robot_utils import busy_wait from lerobot.common.utils.utils import log_say from lerobot.configs import parser @@ -1191,7 +1193,7 @@ class BaseLeaderControlWrapper(gym.Wrapper): "rerecord_episode": False, } - def _handle_key_press(self, key, keyboard): + def _handle_key_press(self, key, keyboard_device): """ Handle key press events. @@ -1202,10 +1204,10 @@ class BaseLeaderControlWrapper(gym.Wrapper): This method should be overridden in subclasses for additional key handling. """ try: - if key == keyboard.Key.esc: + if key == keyboard_device.Key.esc: self.keyboard_events["episode_end"] = True return - if key == keyboard.Key.left: + if key == keyboard_device.Key.left: self.keyboard_events["rerecord_episode"] = True return if hasattr(key, "char") and key.char == "s": @@ -1221,13 +1223,13 @@ class BaseLeaderControlWrapper(gym.Wrapper): This method sets up keyboard event handling if not in headless mode. """ - from pynput import keyboard + from pynput import keyboard as keyboard_device def on_press(key): with self.event_lock: - self._handle_key_press(key, keyboard) + self._handle_key_press(key, keyboard_device) - self.listener = keyboard.Listener(on_press=on_press) + self.listener = keyboard_device.Listener(on_press=on_press) self.listener.start() def _check_intervention(self): @@ -1403,7 +1405,7 @@ class GearedLeaderControlWrapper(BaseLeaderControlWrapper): super()._init_keyboard_events() self.keyboard_events["human_intervention_step"] = False - def _handle_key_press(self, key, keyboard): + def _handle_key_press(self, key, keyboard_device): """ Handle key presses including space for intervention toggle. @@ -1413,8 +1415,8 @@ class GearedLeaderControlWrapper(BaseLeaderControlWrapper): Extends the base handler to respond to space key for toggling intervention. """ - super()._handle_key_press(key, keyboard) - if key == keyboard.Key.space: + super()._handle_key_press(key, keyboard_device) + if key == keyboard_device.Key.space: if not self.keyboard_events["human_intervention_step"]: logging.info( "Space key pressed. Human intervention required.\n" @@ -1574,7 +1576,7 @@ class GamepadControlWrapper(gym.Wrapper): print(" Y/Triangle button: End episode (SUCCESS)") print(" B/Circle button: Exit program") - def get_gamepad_action( + def get_teleop_commands( self, ) -> tuple[bool, np.ndarray, bool, bool, bool]: """ @@ -1643,7 +1645,7 @@ class GamepadControlWrapper(gym.Wrapper): terminate_episode, success, rerecord_episode, - ) = self.get_gamepad_action() + ) = self.get_teleop_commands() # Update episode ending state if requested if terminate_episode: @@ -1700,6 +1702,90 @@ class GamepadControlWrapper(gym.Wrapper): return self.env.close() +class KeyboardControlWrapper(GamepadControlWrapper): + """ + Wrapper that allows controlling a gym environment with a keyboard. + + This wrapper intercepts the step method and allows human input via keyboard + to override the agent's actions when desired. + + Inherits from GamepadControlWrapper to avoid code duplication. + """ + + def __init__( + self, + env, + teleop_device, # Accepts an instantiated teleoperator + use_gripper=False, # This should align with teleop_device's config + auto_reset=False, + ): + """ + Initialize the gamepad controller wrapper. + + Args: + env: The environment to wrap. + teleop_device: The instantiated teleoperation device (e.g., GamepadTeleop). + use_gripper: Whether to include gripper control (should match teleop_device.config.use_gripper). + auto_reset: Whether to auto reset the environment when episode ends. + """ + super().__init__(env, teleop_device, use_gripper, auto_reset) + + self.is_intervention_active = False + + logging.info("Keyboard control wrapper initialized with provided teleop_device.") + print("Keyboard controls:") + print(" Arrow keys: Move in X-Y plane") + print(" Shift and Shift_R: Move in Z axis") + print(" Right Ctrl and Left Ctrl: Open and close gripper") + print(" f: End episode with FAILURE") + print(" s: End episode with SUCCESS") + print(" r: End episode with RERECORD") + print(" i: Start/Stop Intervention") + + def get_teleop_commands( + self, + ) -> tuple[bool, np.ndarray, bool, bool, bool]: + action_dict = self.teleop_device.get_action() + episode_end_status = None + + # Unroll the misc_keys_queue to check for events related to intervention, episode success, etc. + while not self.teleop_device.misc_keys_queue.empty(): + key = self.teleop_device.misc_keys_queue.get() + if key == "i": + self.is_intervention_active = not self.is_intervention_active + elif key == "f": + episode_end_status = "failure" + elif key == "s": + episode_end_status = "success" + elif key == "r": + episode_end_status = "rerecord_episode" + + terminate_episode = episode_end_status is not None + success = episode_end_status == "success" + rerecord_episode = episode_end_status == "rerecord_episode" + + # Convert action_dict to numpy array based on expected structure + # Order: delta_x, delta_y, delta_z, gripper (if use_gripper) + action_list = [action_dict["delta_x"], action_dict["delta_y"], action_dict["delta_z"]] + if self.use_gripper: + # GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open) + # This needs to be consistent with what EEActionWrapper expects if it's used downstream + # EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open) + # For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility. + gripper_val = action_dict.get("gripper", 1.0) # Default to 1.0 (stay) if not present + action_list.append(float(gripper_val)) + + gamepad_action_np = np.array(action_list, dtype=np.float32) + + return ( + self.is_intervention_active, + gamepad_action_np, + terminate_episode, + success, + rerecord_episode, + ) + + class GymHilDeviceWrapper(gym.Wrapper): def __init__(self, env, device="cpu"): super().__init__(env) @@ -1843,6 +1929,15 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env: teleop_device=teleop_device, use_gripper=cfg.wrapper.use_gripper, ) + elif control_mode == "keyboard_ee": + assert isinstance(teleop_device, KeyboardEndEffectorTeleop), ( + "teleop_device must be an instance of KeyboardEndEffectorTeleop for keyboard control mode" + ) + env = KeyboardControlWrapper( + env=env, + teleop_device=teleop_device, + use_gripper=cfg.wrapper.use_gripper, + ) elif control_mode == "leader": env = GearedLeaderControlWrapper( env=env,