refactor/lekiwi robot (#863)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Simon Alibert <simon.alibert@huggingface.co>
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
This commit is contained in:
Steven Palma
2025-04-29 17:48:41 +02:00
committed by GitHub
parent b7a9b0689a
commit 2e528a8b12
21 changed files with 1161 additions and 1744 deletions

View File

@@ -22,4 +22,5 @@ from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("keyboard")
@dataclass
class KeyboardTeleopConfig(TeleoperatorConfig):
# TODO(Steven): Consider setting in here the keys that we want to capture/listen
mock: bool = False

View File

@@ -19,8 +19,7 @@ import os
import sys
import time
from queue import Queue
import numpy as np
from typing import Any
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
@@ -59,48 +58,50 @@ class KeyboardTeleop(Teleoperator):
self.event_queue = Queue()
self.current_pressed = {}
self.listener = None
self.is_connected = False
self.logs = {}
@property
def action_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
}
# TODO(Steven): Change this when we agree what should this return
...
@property
def feedback_feature(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
return PYNPUT_AVAILABLE and isinstance(self.listener, keyboard.Listener) and self.listener.is_alive()
@property
def is_calibrated(self) -> bool:
pass
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
"Keyboard is already connected. Do not run `robot.connect()` twice."
)
if PYNPUT_AVAILABLE:
logging.info("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release,
on_press=self._on_press,
on_release=self._on_release,
)
self.listener.start()
else:
logging.info("pynput not available - skipping local keyboard listener.")
self.listener = None
self.is_connected = True
def calibrate(self) -> None:
pass
def on_press(self, key):
def _on_press(self, key):
if hasattr(key, "char"):
self.event_queue.put((key.char, True))
def on_release(self, key):
def _on_release(self, key):
if hasattr(key, "char"):
self.event_queue.put((key.char, False))
if key == keyboard.Key.esc:
@@ -112,10 +113,13 @@ class KeyboardTeleop(Teleoperator):
key_char, is_pressed = self.event_queue.get_nowait()
self.current_pressed[key_char] = is_pressed
def get_action(self) -> np.ndarray:
def configure(self):
pass
def get_action(self) -> dict[str, Any]:
before_read_t = time.perf_counter()
if not self.is_connected:
if not self._is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)
@@ -126,9 +130,9 @@ class KeyboardTeleop(Teleoperator):
action = {key for key, val in self.current_pressed.items() if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return np.array(list(action))
return dict.fromkeys(action, None)
def send_feedback(self, feedback: np.ndarray) -> None:
def send_feedback(self, feedback: dict[str, Any]) -> None:
pass
def disconnect(self) -> None:
@@ -138,5 +142,3 @@ class KeyboardTeleop(Teleoperator):
)
if self.listener is not None:
self.listener.stop()
self.is_connected = False

View File

@@ -1,28 +0,0 @@
import logging
import time
from lerobot.common.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig
def main():
logging.info("Configuring Keyboard Teleop")
keyboard_config = KeyboardTeleopConfig()
keyboard = KeyboardTeleop(keyboard_config)
logging.info("Connecting Keyboard Teleop")
keyboard.connect()
logging.info("Starting Keyboard capture")
i = 0
while i < 20:
action = keyboard.get_action()
print("Captured keys: %s", action)
time.sleep(1)
i += 1
keyboard.disconnect()
logging.info("Finished LeKiwiRobot cleanly")
if __name__ == "__main__":
main()