From a065bd61aec5c5986901126dcd2878908c7127cf Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Mon, 10 Mar 2025 18:28:50 +0100 Subject: [PATCH] Add keyboard teleop --- .../common/teleoperators/keyboard/__init__.py | 4 + .../keyboard/configuration_keyboard.py | 25 ++++ .../teleoperators/keyboard/teleop_keyboard.py | 128 ++++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 lerobot/common/teleoperators/keyboard/__init__.py create mode 100644 lerobot/common/teleoperators/keyboard/configuration_keyboard.py create mode 100644 lerobot/common/teleoperators/keyboard/teleop_keyboard.py diff --git a/lerobot/common/teleoperators/keyboard/__init__.py b/lerobot/common/teleoperators/keyboard/__init__.py new file mode 100644 index 000000000..9d27a34d6 --- /dev/null +++ b/lerobot/common/teleoperators/keyboard/__init__.py @@ -0,0 +1,4 @@ +from .configuration_keyboard import KeyboardTeleopConfig +from .teleop_keyboard import KeyboardTeleop + +__all__ = ["KeyboardTeleopConfig", "KeyboardTeleop"] diff --git a/lerobot/common/teleoperators/keyboard/configuration_keyboard.py b/lerobot/common/teleoperators/keyboard/configuration_keyboard.py new file mode 100644 index 000000000..91b596bf7 --- /dev/null +++ b/lerobot/common/teleoperators/keyboard/configuration_keyboard.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("keyboard") +@dataclass +class KeyboardTeleopConfig(TeleoperatorConfig): + mock: bool = False diff --git a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py new file mode 100644 index 000000000..ff85bb40e --- /dev/null +++ b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import sys +import time + +import numpy as np + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from ..teleoperator import Teleoperator +from .configuration_keyboard import KeyboardTeleopConfig + +PYNPUT_AVAILABLE = True +try: + # Only import if there's a valid X server or if we're not on a Pi + if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): + logging.info("No DISPLAY set. Skipping pynput import.") + raise ImportError("pynput blocked intentionally due to no display.") + + from pynput import keyboard +except ImportError: + keyboard = None + PYNPUT_AVAILABLE = False +except Exception as e: + keyboard = None + PYNPUT_AVAILABLE = False + logging.info(f"Could not import pynput: {e}") + + +class KeyboardTeleop(Teleoperator): + """ + Teleop class to use keyboard inputs for control. + """ + + config_class = KeyboardTeleopConfig + name = "keyboard" + + def __init__(self, config: KeyboardTeleopConfig): + super().__init__(config) + self.config = config + self.robot_type = config.type + self.id = config.id + + self.pressed_keys = {} + self.listener = None + self.is_connected = False + self.logs = {} + + @property + def action_feature(self) -> dict: + return { + "dtype": "float32", + "shape": (len(self.arm),), + "names": {"motors": list(self.arm.motors)}, + } + + @property + def feedback_feature(self) -> dict: + return {} + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError( + "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." + ) + + if PYNPUT_AVAILABLE: + logging.info("pynput is available - enabling local keyboard listener.") + self.listener = keyboard.Listener( + on_press=self.on_press, + on_release=self.on_release, + ) + self.listener.start() + else: + logging.info("pynput not available - skipping local keyboard listener.") + self.listener = None + + self.is_connected = True + + def calibrate(self) -> None: + pass + + def on_press(self, key): + if hasattr(key, "char"): + self.pressed_keys[key.char] = True + + def on_release(self, key): + if hasattr(key, "char"): + self.pressed_keys[key.char] = False + if key == keyboard.Key.esc: + logging.info("ESC pressed, disconnecting.") + self.disconnect() + + def get_action(self) -> np.ndarray: + before_read_t = time.perf_counter() + # pressed_keys.items is wrapped in a list to avoid any RuntimeError due to dictionary changing size + # during iteration + action = {key for key, val in list(self.pressed_keys.items()) if val} + self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t + + return action + + def send_feedback(self, feedback: np.ndarray) -> None: + pass + + def disconnect(self) -> None: + if not self.is_connected: + raise DeviceNotConnectedError( + "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." + ) + self.listener.stop() + self.is_connected = False