Files
lerobot/lerobot/common/teleoperators/keyboard/teleop_keyboard.py
2025-04-04 09:45:18 +02:00

143 lines
4.3 KiB
Python

#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import time
from queue import Queue
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .configuration_keyboard import KeyboardTeleopConfig
PYNPUT_AVAILABLE = True
try:
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logging.info("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
from pynput import keyboard
except ImportError:
keyboard = None
PYNPUT_AVAILABLE = False
except Exception as e:
keyboard = None
PYNPUT_AVAILABLE = False
logging.info(f"Could not import pynput: {e}")
class KeyboardTeleop(Teleoperator):
"""
Teleop class to use keyboard inputs for control.
"""
config_class = KeyboardTeleopConfig
name = "keyboard"
def __init__(self, config: KeyboardTeleopConfig):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.event_queue = Queue()
self.current_pressed = {}
self.listener = None
self.is_connected = False
self.logs = {}
@property
def action_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
}
@property
def feedback_feature(self) -> dict:
return {}
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
)
if PYNPUT_AVAILABLE:
logging.info("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release,
)
self.listener.start()
else:
logging.info("pynput not available - skipping local keyboard listener.")
self.listener = None
self.is_connected = True
def calibrate(self) -> None:
pass
def on_press(self, key):
if hasattr(key, "char"):
self.event_queue.put((key.char, True))
def on_release(self, key):
if hasattr(key, "char"):
self.event_queue.put((key.char, False))
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
self.disconnect()
def _drain_pressed_keys(self):
while not self.event_queue.empty():
key_char, is_pressed = self.event_queue.get_nowait()
self.current_pressed[key_char] = is_pressed
def get_action(self) -> np.ndarray:
before_read_t = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)
self._drain_pressed_keys()
# Generate action based on current key states
action = {key for key, val in self.current_pressed.items() if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return np.array(list(action))
def send_feedback(self, feedback: np.ndarray) -> None:
pass
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`."
)
if self.listener is not None:
self.listener.stop()
self.is_connected = False