Add keyboard teleop

This commit is contained in:
Simon Alibert
2025-03-10 18:28:50 +01:00
parent 5dc3c74e64
commit a065bd61ae
3 changed files with 157 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from .configuration_keyboard import KeyboardTeleopConfig
from .teleop_keyboard import KeyboardTeleop
__all__ = ["KeyboardTeleopConfig", "KeyboardTeleop"]

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("keyboard")
@dataclass
class KeyboardTeleopConfig(TeleoperatorConfig):
mock: bool = False

View File

@@ -0,0 +1,128 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import time
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .configuration_keyboard import KeyboardTeleopConfig
PYNPUT_AVAILABLE = True
try:
# Only import if there's a valid X server or if we're not on a Pi
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logging.info("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
from pynput import keyboard
except ImportError:
keyboard = None
PYNPUT_AVAILABLE = False
except Exception as e:
keyboard = None
PYNPUT_AVAILABLE = False
logging.info(f"Could not import pynput: {e}")
class KeyboardTeleop(Teleoperator):
"""
Teleop class to use keyboard inputs for control.
"""
config_class = KeyboardTeleopConfig
name = "keyboard"
def __init__(self, config: KeyboardTeleopConfig):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.pressed_keys = {}
self.listener = None
self.is_connected = False
self.logs = {}
@property
def action_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
}
@property
def feedback_feature(self) -> dict:
return {}
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
)
if PYNPUT_AVAILABLE:
logging.info("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release,
)
self.listener.start()
else:
logging.info("pynput not available - skipping local keyboard listener.")
self.listener = None
self.is_connected = True
def calibrate(self) -> None:
pass
def on_press(self, key):
if hasattr(key, "char"):
self.pressed_keys[key.char] = True
def on_release(self, key):
if hasattr(key, "char"):
self.pressed_keys[key.char] = False
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
self.disconnect()
def get_action(self) -> np.ndarray:
before_read_t = time.perf_counter()
# pressed_keys.items is wrapped in a list to avoid any RuntimeError due to dictionary changing size
# during iteration
action = {key for key, val in list(self.pressed_keys.items()) if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return action
def send_feedback(self, feedback: np.ndarray) -> None:
pass
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
)
self.listener.stop()
self.is_connected = False