From 1522e60f8356e1ed1da84f02b44149294e366a36 Mon Sep 17 00:00:00 2001 From: Pepijn <138571049+pkooij@users.noreply.github.com> Date: Wed, 2 Jul 2025 11:41:20 +0200 Subject: [PATCH] feat: Add fixes and refactor lekiwi example (#1396) * feat: Add fixes and refactor lekiwi example * fix: replace repo_id with placeholders * feat: use record_loop for lekiwi, use same control strucutre as record.py * feat: make rerun log more general for lekiwi * fix: add comments record_loop and fix params evaluate.py * fix: add events in evaluate.py * fix: add events 2 * change record to display data * Integrate feedback steven * Add docs merging * fix: add lekiwi name check * fix: integrate feedback steven * fix: list for type * fix: check type list * remove second robot connect * fix: added file when merging * fix(record): account for edge cases when teleop is a list --------- Co-authored-by: Steven Palma --- examples/lekiwi/evaluate.py | 92 +++++++++++++++---- examples/lekiwi/record.py | 100 ++++++++++++++------- examples/lekiwi/replay.py | 18 ++-- examples/lekiwi/teleoperate.py | 47 ++++++---- src/lerobot/record.py | 46 +++++++--- src/lerobot/robots/lekiwi/lekiwi_client.py | 29 +++--- src/lerobot/teleoperate.py | 13 +-- src/lerobot/utils/visualization_utils.py | 20 +++++ 8 files changed, 255 insertions(+), 110 deletions(-) diff --git a/examples/lekiwi/evaluate.py b/examples/lekiwi/evaluate.py index 60526e6d9..a8aef6a28 100644 --- a/examples/lekiwi/evaluate.py +++ b/examples/lekiwi/evaluate.py @@ -1,32 +1,90 @@ -from lerobot.datasets.utils import build_dataset_frame, hw_to_dataset_features +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.utils import hw_to_dataset_features from lerobot.policies.act.modeling_act import ACTPolicy +from lerobot.record import record_loop from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig -from lerobot.utils.control_utils import predict_action -from lerobot.utils.utils import get_safe_torch_device +from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.utils import log_say +from lerobot.utils.visualization_utils import _init_rerun -NB_CYCLES_CLIENT_CONNECTION = 1000 +NUM_EPISODES = 2 +FPS = 30 +EPISODE_TIME_SEC = 60 +TASK_DESCRIPTION = "My task description" +# Create the robot and teleoperator configurations robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi") robot = LeKiwiClient(robot_config) +policy = ACTPolicy.from_pretrained("/") + +# Configure the dataset features +action_features = hw_to_dataset_features(robot.action_features, "action") +obs_features = hw_to_dataset_features(robot.observation_features, "observation") +dataset_features = {**action_features, **obs_features} + +# Create the dataset +dataset = LeRobotDataset.create( + repo_id="/", + fps=FPS, + features=dataset_features, + robot_type=robot.name, + use_videos=True, + image_writer_threads=4, +) + +# To connect you already should have this script running on LeKiwi: `python -m lerobot.common.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi` robot.connect() -policy = ACTPolicy.from_pretrained("pepijn223/act_lekiwi_circle") -policy.reset() +_init_rerun(session_name="recording") -obs_features = hw_to_dataset_features(robot.observation_features, "observation") +listener, events = init_keyboard_listener() -print("Running inference") -i = 0 -while i < NB_CYCLES_CLIENT_CONNECTION: - obs = robot.get_observation() +if not robot.is_connected: + raise ValueError("Robot is not connected!") - observation_frame = build_dataset_frame(obs_features, obs, prefix="observation") - action_values = predict_action( - observation_frame, policy, get_safe_torch_device(policy.config.device), policy.config.use_amp +recorded_episodes = 0 +while recorded_episodes < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Running inference, recording eval episode {recorded_episodes} of {NUM_EPISODES}") + + # Run the policy inference loop + record_loop( + robot=robot, + events=events, + fps=FPS, + policy=policy, + dataset=dataset, + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, ) - action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)} - robot.send_action(action) - i += 1 + + # Logic for reset env + if not events["stop_recording"] and ( + (recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + ) + + if events["rerecord_episode"]: + log_say("Re-record episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue + + dataset.save_episode() + recorded_episodes += 1 + +# Upload to hub and clean up +dataset.push_to_hub() robot.disconnect() +listener.stop() diff --git a/examples/lekiwi/record.py b/examples/lekiwi/record.py index 2ad32677f..2e8b9f355 100644 --- a/examples/lekiwi/record.py +++ b/examples/lekiwi/record.py @@ -1,67 +1,101 @@ -import time - from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.utils import hw_to_dataset_features +from lerobot.record import record_loop from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient from lerobot.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig +from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.utils import log_say +from lerobot.utils.visualization_utils import _init_rerun -NB_CYCLES_CLIENT_CONNECTION = 250 - -leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem58760431551") -leader_arm = SO100Leader(leader_arm_config) +NUM_EPISODES = 3 +FPS = 30 +EPISODE_TIME_SEC = 30 +RESET_TIME_SEC = 10 +TASK_DESCRIPTION = "My task description" +# Create the robot and teleoperator configurations +robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi") +leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm") keyboard_config = KeyboardTeleopConfig() + +robot = LeKiwiClient(robot_config) +leader_arm = SO100Leader(leader_arm_config) keyboard = KeyboardTeleop(keyboard_config) -robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi") -robot = LeKiwiClient(robot_config) - +# Configure the dataset features action_features = hw_to_dataset_features(robot.action_features, "action") obs_features = hw_to_dataset_features(robot.observation_features, "observation") dataset_features = {**action_features, **obs_features} +# Create the dataset dataset = LeRobotDataset.create( - repo_id="pepijn223/lekiwi" + str(int(time.time())), - fps=10, + repo_id="/", + fps=FPS, features=dataset_features, robot_type=robot.name, + use_videos=True, + image_writer_threads=4, ) +# To connect you already should have this script running on LeKiwi: `python -m lerobot.common.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi` +robot.connect() leader_arm.connect() keyboard.connect() -robot.connect() + +_init_rerun(session_name="lekiwi_record") + +listener, events = init_keyboard_listener() if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: - exit() + raise ValueError("Robot, leader arm of keyboard is not connected!") -print("Starting LeKiwi recording") -i = 0 -while i < NB_CYCLES_CLIENT_CONNECTION: - arm_action = leader_arm.get_action() - arm_action = {f"arm_{k}": v for k, v in arm_action.items()} +recorded_episodes = 0 +while recorded_episodes < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Recording episode {recorded_episodes}") - keyboard_keys = keyboard.get_action() + # Run the record loop + record_loop( + robot=robot, + events=events, + fps=FPS, + dataset=dataset, + teleop=[leader_arm, keyboard], + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + ) - base_action = robot._from_keyboard_to_base_action(keyboard_keys) + # Logic for reset env + if not events["stop_recording"] and ( + (recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + teleop=[leader_arm, keyboard], + control_time_s=RESET_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + ) - action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action + if events["rerecord_episode"]: + log_say("Re-record episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - action_sent = robot.send_action(action) - observation = robot.get_observation() + dataset.save_episode() + recorded_episodes += 1 - frame = {**action_sent, **observation} - task = "Dummy Example Task Dataset" +# Upload to hub and clean up +dataset.push_to_hub() - dataset.add_frame(frame, task) - i += 1 - -print("Disconnecting Teleop Devices and LeKiwi Client") robot.disconnect() leader_arm.disconnect() keyboard.disconnect() - -print("Uploading dataset to the hub") -dataset.save_episode() -dataset.push_to_hub() +listener.stop() diff --git a/examples/lekiwi/replay.py b/examples/lekiwi/replay.py index 2c49fc583..248354df9 100644 --- a/examples/lekiwi/replay.py +++ b/examples/lekiwi/replay.py @@ -4,22 +4,30 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient from lerobot.utils.robot_utils import busy_wait +from lerobot.utils.utils import log_say + +EPISODE_IDX = 0 robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi") robot = LeKiwiClient(robot_config) -dataset = LeRobotDataset("pepijn223/lekiwi1749025613", episodes=[0]) +dataset = LeRobotDataset("/", episodes=[EPISODE_IDX]) +actions = dataset.hf_dataset.select_columns("action") robot.connect() -print("Replaying episode…") -for _, action_array in enumerate(dataset.hf_dataset["action"]): +if not robot.is_connected: + raise ValueError("Robot is not connected!") + +log_say(f"Replaying episode {EPISODE_IDX}") +for idx in range(dataset.num_frames): t0 = time.perf_counter() - action = {name: float(action_array[i]) for i, name in enumerate(dataset.features["action"]["names"])} + action = { + name: float(actions[idx]["action"][i]) for i, name in enumerate(dataset.features["action"]["names"]) + } robot.send_action(action) busy_wait(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0)) -print("Disconnecting LeKiwi Client") robot.disconnect() diff --git a/examples/lekiwi/teleoperate.py b/examples/lekiwi/teleoperate.py index d7218dfcd..448610e2b 100644 --- a/examples/lekiwi/teleoperate.py +++ b/examples/lekiwi/teleoperate.py @@ -1,32 +1,47 @@ +import time + from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop, KeyboardTeleopConfig from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig +from lerobot.utils.robot_utils import busy_wait +from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data +FPS = 30 + +# Create the robot and teleoperator configurations robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="my_lekiwi") - -teleop__arm_config = SO100LeaderConfig( - port="/dev/tty.usbmodem58760431551", - id="my_awesome_leader_arm", -) - -teleop_keyboard_config = KeyboardTeleopConfig( - id="my_laptop_keyboard", -) +teleop_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm") +keyboard_config = KeyboardTeleopConfig(id="my_laptop_keyboard") robot = LeKiwiClient(robot_config) -teleop_arm = SO100Leader(teleop__arm_config) -telep_keyboard = KeyboardTeleop(teleop_keyboard_config) +leader_arm = SO100Leader(teleop_arm_config) +keyboard = KeyboardTeleop(keyboard_config) + +# To connect you already should have this script running on LeKiwi: `python -m lerobot.common.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi` robot.connect() -teleop_arm.connect() -telep_keyboard.connect() +leader_arm.connect() +keyboard.connect() + +_init_rerun(session_name="lekiwi_teleop") + +if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: + raise ValueError("Robot, leader arm of keyboard is not connected!") while True: + t0 = time.perf_counter() + observation = robot.get_observation() - arm_action = teleop_arm.get_action() + arm_action = leader_arm.get_action() arm_action = {f"arm_{k}": v for k, v in arm_action.items()} - keyboard_keys = telep_keyboard.get_action() + keyboard_keys = keyboard.get_action() base_action = robot._from_keyboard_to_base_action(keyboard_keys) - robot.send_action(arm_action | base_action) + log_rerun_data(observation, {**arm_action, **base_action}) + + action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action + + robot.send_action(action) + + busy_wait(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) diff --git a/src/lerobot/record.py b/src/lerobot/record.py index 54d7f3952..635bdf1e4 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -40,9 +40,7 @@ import time from dataclasses import asdict, dataclass from pathlib import Path from pprint import pformat - -import numpy as np -import rerun as rr +from typing import List from lerobot.cameras import ( # noqa: F401 CameraConfig, # noqa: F401 @@ -72,6 +70,7 @@ from lerobot.teleoperators import ( # noqa: F401 so100_leader, so101_leader, ) +from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop from lerobot.utils.control_utils import ( init_keyboard_listener, is_headless, @@ -85,7 +84,7 @@ from lerobot.utils.utils import ( init_logging, log_say, ) -from lerobot.utils.visualization_utils import _init_rerun +from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data @dataclass @@ -165,7 +164,7 @@ def record_loop( events: dict, fps: int, dataset: LeRobotDataset | None = None, - teleop: Teleoperator | None = None, + teleop: Teleoperator | List[Teleoperator] | None = None, policy: PreTrainedPolicy | None = None, control_time_s: int | None = None, single_task: str | None = None, @@ -174,6 +173,23 @@ def record_loop( if dataset is not None and dataset.fps != fps: raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).") + teleop_arm = teleop_keyboard = None + if isinstance(teleop, list): + teleop_keyboard = next((t for t in teleop if isinstance(t, KeyboardTeleop)), None) + teleop_arm = next( + ( + t + for t in teleop + if isinstance(t, (so100_leader.SO100Leader, so101_leader.SO101Leader, koch_leader.KochLeader)) + ), + None, + ) + + if not (teleop_arm and teleop_keyboard and len(teleop) == 2 and robot.name == "lekiwi_client"): + raise ValueError( + "For multi-teleop, the list must contain exactly one KeyboardTeleop and one arm teleoperator. Currently only supported for LeKiwi robot." + ) + # if policy is given it needs cleaning up if policy is not None: policy.reset() @@ -202,8 +218,17 @@ def record_loop( robot_type=robot.robot_type, ) action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)} - elif policy is None and teleop is not None: + elif policy is None and isinstance(teleop, Teleoperator): action = teleop.get_action() + elif policy is None and isinstance(teleop, list): + # TODO(pepijn, steven): clean the record loop for use of multiple robots (possibly with pipeline) + arm_action = teleop_arm.get_action() + arm_action = {f"arm_{k}": v for k, v in arm_action.items()} + + keyboard_action = teleop_keyboard.get_action() + base_action = robot._from_keyboard_to_base_action(keyboard_action) + + action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action else: logging.info( "No policy or teleoperator provided, skipping action generation." @@ -222,14 +247,7 @@ def record_loop( dataset.add_frame(frame, task=single_task) if display_data: - for obs, val in observation.items(): - if isinstance(val, float): - rr.log(f"observation.{obs}", rr.Scalar(val)) - elif isinstance(val, np.ndarray): - rr.log(f"observation.{obs}", rr.Image(val), static=True) - for act, val in action.items(): - if isinstance(val, float): - rr.log(f"action.{act}", rr.Scalar(val)) + log_rerun_data(observation, action) dt_s = time.perf_counter() - start_loop_t busy_wait(1 / fps - dt_s) diff --git a/src/lerobot/robots/lekiwi/lekiwi_client.py b/src/lerobot/robots/lekiwi/lekiwi_client.py index ffde1c852..0ce259bb6 100644 --- a/src/lerobot/robots/lekiwi/lekiwi_client.py +++ b/src/lerobot/robots/lekiwi/lekiwi_client.py @@ -22,7 +22,6 @@ from typing import Any, Dict, Optional, Tuple import cv2 import numpy as np -import torch import zmq from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError @@ -195,26 +194,23 @@ class LeKiwiClient(Robot): self, observation: Dict[str, Any] ) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]: """Extracts frames, and state from the parsed observation.""" - flat_state = {key: value for key, value in observation.items() if key in self._state_ft} - state_vec = np.array( - [flat_state.get(k, 0.0) for k in self._state_order], - dtype=np.float32, - ) + flat_state = {key: observation.get(key, 0.0) for key in self._state_order} + + state_vec = np.array([flat_state[key] for key in self._state_order], dtype=np.float32) + + obs_dict: Dict[str, Any] = {**flat_state, "observation.state": state_vec} # Decode images - image_observation = { - f"observation.images.{key}": value - for key, value in observation.items() - if key in self._cameras_ft - } current_frames: Dict[str, np.ndarray] = {} - for cam_name, image_b64 in image_observation.items(): + for cam_name, image_b64 in observation.items(): + if cam_name not in self._cameras_ft: + continue frame = self._decode_image_from_b64(image_b64) if frame is not None: current_frames[cam_name] = frame - return current_frames, {"observation.state": state_vec} + return current_frames, obs_dict def _get_data(self) -> Tuple[Dict[str, np.ndarray], Dict[str, Any], Dict[str, Any]]: """ @@ -267,7 +263,7 @@ class LeKiwiClient(Robot): if frame is None: logging.warning("Frame is None") frame = np.zeros((640, 480, 3), dtype=np.uint8) - obs_dict[cam_name] = torch.from_numpy(frame) + obs_dict[cam_name] = frame return obs_dict @@ -327,7 +323,10 @@ class LeKiwiClient(Robot): # TODO(Steven): Remove the np conversion when it is possible to record a non-numpy array value actions = np.array([action.get(k, 0.0) for k in self._state_order], dtype=np.float32) - return {"action": actions} + + action_sent = {key: actions[i] for i, key in enumerate(self._state_order)} + action_sent["action"] = actions + return action_sent def disconnect(self): """Cleans ZMQ comms""" diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index 20b4925f2..e2819345b 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -36,7 +36,6 @@ from dataclasses import asdict, dataclass from pprint import pformat import draccus -import numpy as np import rerun as rr from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 @@ -60,11 +59,12 @@ from lerobot.teleoperators import ( # noqa: F401 ) from lerobot.utils.robot_utils import busy_wait from lerobot.utils.utils import init_logging, move_cursor_up -from lerobot.utils.visualization_utils import _init_rerun +from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data @dataclass class TeleoperateConfig: + # TODO: pepijn, steven: if more robots require multiple teleoperators (like lekiwi) its good to make this possibele in teleop.py and record.py with List[Teleoperator] teleop: TeleoperatorConfig robot: RobotConfig # Limit the maximum frames per second. @@ -84,14 +84,7 @@ def teleop_loop( action = teleop.get_action() if display_data: observation = robot.get_observation() - for obs, val in observation.items(): - if isinstance(val, float): - rr.log(f"observation_{obs}", rr.Scalar(val)) - elif isinstance(val, np.ndarray): - rr.log(f"observation_{obs}", rr.Image(val), static=True) - for act, val in action.items(): - if isinstance(val, float): - rr.log(f"action_{act}", rr.Scalar(val)) + log_rerun_data(observation, action) robot.send_action(action) dt_s = time.perf_counter() - loop_start diff --git a/src/lerobot/utils/visualization_utils.py b/src/lerobot/utils/visualization_utils.py index dfffece5f..4a5516549 100644 --- a/src/lerobot/utils/visualization_utils.py +++ b/src/lerobot/utils/visualization_utils.py @@ -13,7 +13,9 @@ # limitations under the License. import os +from typing import Any +import numpy as np import rerun as rr @@ -24,3 +26,21 @@ def _init_rerun(session_name: str = "lerobot_control_loop") -> None: rr.init(session_name) memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%") rr.spawn(memory_limit=memory_limit) + + +def log_rerun_data(observation: dict[str | Any], action: dict[str | Any]): + for obs, val in observation.items(): + if isinstance(val, float): + rr.log(f"observation.{obs}", rr.Scalar(val)) + elif isinstance(val, np.ndarray): + if val.ndim == 1: + for i, v in enumerate(val): + rr.log(f"observation.{obs}_{i}", rr.Scalar(float(v))) + else: + rr.log(f"observation.{obs}", rr.Image(val), static=True) + for act, val in action.items(): + if isinstance(val, float): + rr.log(f"action.{act}", rr.Scalar(val)) + elif isinstance(val, np.ndarray): + for i, v in enumerate(val): + rr.log(f"action.{act}_{i}", rr.Scalar(float(v)))