feat: Add fixes and refactor lekiwi example (#1396)

* feat: Add fixes and refactor lekiwi example

* fix: replace repo_id with placeholders

* feat: use record_loop for lekiwi, use same control strucutre as record.py

* feat: make rerun log more general for lekiwi

* fix: add comments record_loop and fix params evaluate.py

* fix: add events in evaluate.py

* fix: add events 2

* change record to display data

* Integrate feedback steven

* Add docs merging

* fix: add lekiwi name check

* fix: integrate feedback steven

* fix: list for type

* fix: check type list

* remove second robot connect

* fix: added file when merging

* fix(record): account for edge cases when teleop is a list

---------

Co-authored-by: Steven Palma <steven.palma@huggingface.co>
This commit is contained in:
Pepijn
2025-07-02 11:41:20 +02:00
committed by GitHub
parent d4ee470b00
commit 1522e60f83
8 changed files with 255 additions and 110 deletions

View File

@@ -1,32 +1,90 @@
from lerobot.datasets.utils import build_dataset_frame, hw_to_dataset_features
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.record import record_loop
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
from lerobot.utils.control_utils import predict_action
from lerobot.utils.utils import get_safe_torch_device
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import _init_rerun
NB_CYCLES_CLIENT_CONNECTION = 1000
NUM_EPISODES = 2
FPS = 30
EPISODE_TIME_SEC = 60
TASK_DESCRIPTION = "My task description"
# Create the robot and teleoperator configurations
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi")
robot = LeKiwiClient(robot_config)
policy = ACTPolicy.from_pretrained("<hf_username>/<policy_repo_id>")
# Configure the dataset features
action_features = hw_to_dataset_features(robot.action_features, "action")
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
dataset_features = {**action_features, **obs_features}
# Create the dataset
dataset = LeRobotDataset.create(
repo_id="<hf_username>/<eval_dataset_repo_id>",
fps=FPS,
features=dataset_features,
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# To connect you already should have this script running on LeKiwi: `python -m lerobot.common.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
robot.connect()
policy = ACTPolicy.from_pretrained("pepijn223/act_lekiwi_circle")
policy.reset()
_init_rerun(session_name="recording")
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
listener, events = init_keyboard_listener()
print("Running inference")
i = 0
while i < NB_CYCLES_CLIENT_CONNECTION:
obs = robot.get_observation()
if not robot.is_connected:
raise ValueError("Robot is not connected!")
observation_frame = build_dataset_frame(obs_features, obs, prefix="observation")
action_values = predict_action(
observation_frame, policy, get_safe_torch_device(policy.config.device), policy.config.use_amp
recorded_episodes = 0
while recorded_episodes < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Running inference, recording eval episode {recorded_episodes} of {NUM_EPISODES}")
# Run the policy inference loop
record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
)
action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)}
robot.send_action(action)
i += 1
# Logic for reset env
if not events["stop_recording"] and (
(recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"]
):
log_say("Reset the environment")
record_loop(
robot=robot,
events=events,
fps=FPS,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
)
if events["rerecord_episode"]:
log_say("Re-record episode")
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
dataset.save_episode()
recorded_episodes += 1
# Upload to hub and clean up
dataset.push_to_hub()
robot.disconnect()
listener.stop()

View File

@@ -1,67 +1,101 @@
import time
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.record import record_loop
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
from lerobot.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig
from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import _init_rerun
NB_CYCLES_CLIENT_CONNECTION = 250
leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem58760431551")
leader_arm = SO100Leader(leader_arm_config)
NUM_EPISODES = 3
FPS = 30
EPISODE_TIME_SEC = 30
RESET_TIME_SEC = 10
TASK_DESCRIPTION = "My task description"
# Create the robot and teleoperator configurations
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi")
leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm")
keyboard_config = KeyboardTeleopConfig()
robot = LeKiwiClient(robot_config)
leader_arm = SO100Leader(leader_arm_config)
keyboard = KeyboardTeleop(keyboard_config)
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi")
robot = LeKiwiClient(robot_config)
# Configure the dataset features
action_features = hw_to_dataset_features(robot.action_features, "action")
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
dataset_features = {**action_features, **obs_features}
# Create the dataset
dataset = LeRobotDataset.create(
repo_id="pepijn223/lekiwi" + str(int(time.time())),
fps=10,
repo_id="<hf_username>/<dataset_repo_id>",
fps=FPS,
features=dataset_features,
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# To connect you already should have this script running on LeKiwi: `python -m lerobot.common.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
robot.connect()
leader_arm.connect()
keyboard.connect()
robot.connect()
_init_rerun(session_name="lekiwi_record")
listener, events = init_keyboard_listener()
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
exit()
raise ValueError("Robot, leader arm of keyboard is not connected!")
print("Starting LeKiwi recording")
i = 0
while i < NB_CYCLES_CLIENT_CONNECTION:
arm_action = leader_arm.get_action()
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
recorded_episodes = 0
while recorded_episodes < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Recording episode {recorded_episodes}")
keyboard_keys = keyboard.get_action()
# Run the record loop
record_loop(
robot=robot,
events=events,
fps=FPS,
dataset=dataset,
teleop=[leader_arm, keyboard],
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
)
base_action = robot._from_keyboard_to_base_action(keyboard_keys)
# Logic for reset env
if not events["stop_recording"] and (
(recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"]
):
log_say("Reset the environment")
record_loop(
robot=robot,
events=events,
fps=FPS,
teleop=[leader_arm, keyboard],
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
)
action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
if events["rerecord_episode"]:
log_say("Re-record episode")
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
action_sent = robot.send_action(action)
observation = robot.get_observation()
dataset.save_episode()
recorded_episodes += 1
frame = {**action_sent, **observation}
task = "Dummy Example Task Dataset"
# Upload to hub and clean up
dataset.push_to_hub()
dataset.add_frame(frame, task)
i += 1
print("Disconnecting Teleop Devices and LeKiwi Client")
robot.disconnect()
leader_arm.disconnect()
keyboard.disconnect()
print("Uploading dataset to the hub")
dataset.save_episode()
dataset.push_to_hub()
listener.stop()

View File

@@ -4,22 +4,30 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.utils import log_say
EPISODE_IDX = 0
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi")
robot = LeKiwiClient(robot_config)
dataset = LeRobotDataset("pepijn223/lekiwi1749025613", episodes=[0])
dataset = LeRobotDataset("<hf_username>/<dataset_repo_id>", episodes=[EPISODE_IDX])
actions = dataset.hf_dataset.select_columns("action")
robot.connect()
print("Replaying episode…")
for _, action_array in enumerate(dataset.hf_dataset["action"]):
if not robot.is_connected:
raise ValueError("Robot is not connected!")
log_say(f"Replaying episode {EPISODE_IDX}")
for idx in range(dataset.num_frames):
t0 = time.perf_counter()
action = {name: float(action_array[i]) for i, name in enumerate(dataset.features["action"]["names"])}
action = {
name: float(actions[idx]["action"][i]) for i, name in enumerate(dataset.features["action"]["names"])
}
robot.send_action(action)
busy_wait(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0))
print("Disconnecting LeKiwi Client")
robot.disconnect()

View File

@@ -1,32 +1,47 @@
import time
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop, KeyboardTeleopConfig
from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
FPS = 30
# Create the robot and teleoperator configurations
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="my_lekiwi")
teleop__arm_config = SO100LeaderConfig(
port="/dev/tty.usbmodem58760431551",
id="my_awesome_leader_arm",
)
teleop_keyboard_config = KeyboardTeleopConfig(
id="my_laptop_keyboard",
)
teleop_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm")
keyboard_config = KeyboardTeleopConfig(id="my_laptop_keyboard")
robot = LeKiwiClient(robot_config)
teleop_arm = SO100Leader(teleop__arm_config)
telep_keyboard = KeyboardTeleop(teleop_keyboard_config)
leader_arm = SO100Leader(teleop_arm_config)
keyboard = KeyboardTeleop(keyboard_config)
# To connect you already should have this script running on LeKiwi: `python -m lerobot.common.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
robot.connect()
teleop_arm.connect()
telep_keyboard.connect()
leader_arm.connect()
keyboard.connect()
_init_rerun(session_name="lekiwi_teleop")
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
raise ValueError("Robot, leader arm of keyboard is not connected!")
while True:
t0 = time.perf_counter()
observation = robot.get_observation()
arm_action = teleop_arm.get_action()
arm_action = leader_arm.get_action()
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
keyboard_keys = telep_keyboard.get_action()
keyboard_keys = keyboard.get_action()
base_action = robot._from_keyboard_to_base_action(keyboard_keys)
robot.send_action(arm_action | base_action)
log_rerun_data(observation, {**arm_action, **base_action})
action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
robot.send_action(action)
busy_wait(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))

View File

@@ -40,9 +40,7 @@ import time
from dataclasses import asdict, dataclass
from pathlib import Path
from pprint import pformat
import numpy as np
import rerun as rr
from typing import List
from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
@@ -72,6 +70,7 @@ from lerobot.teleoperators import ( # noqa: F401
so100_leader,
so101_leader,
)
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop
from lerobot.utils.control_utils import (
init_keyboard_listener,
is_headless,
@@ -85,7 +84,7 @@ from lerobot.utils.utils import (
init_logging,
log_say,
)
from lerobot.utils.visualization_utils import _init_rerun
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
@dataclass
@@ -165,7 +164,7 @@ def record_loop(
events: dict,
fps: int,
dataset: LeRobotDataset | None = None,
teleop: Teleoperator | None = None,
teleop: Teleoperator | List[Teleoperator] | None = None,
policy: PreTrainedPolicy | None = None,
control_time_s: int | None = None,
single_task: str | None = None,
@@ -174,6 +173,23 @@ def record_loop(
if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
teleop_arm = teleop_keyboard = None
if isinstance(teleop, list):
teleop_keyboard = next((t for t in teleop if isinstance(t, KeyboardTeleop)), None)
teleop_arm = next(
(
t
for t in teleop
if isinstance(t, (so100_leader.SO100Leader, so101_leader.SO101Leader, koch_leader.KochLeader))
),
None,
)
if not (teleop_arm and teleop_keyboard and len(teleop) == 2 and robot.name == "lekiwi_client"):
raise ValueError(
"For multi-teleop, the list must contain exactly one KeyboardTeleop and one arm teleoperator. Currently only supported for LeKiwi robot."
)
# if policy is given it needs cleaning up
if policy is not None:
policy.reset()
@@ -202,8 +218,17 @@ def record_loop(
robot_type=robot.robot_type,
)
action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)}
elif policy is None and teleop is not None:
elif policy is None and isinstance(teleop, Teleoperator):
action = teleop.get_action()
elif policy is None and isinstance(teleop, list):
# TODO(pepijn, steven): clean the record loop for use of multiple robots (possibly with pipeline)
arm_action = teleop_arm.get_action()
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
keyboard_action = teleop_keyboard.get_action()
base_action = robot._from_keyboard_to_base_action(keyboard_action)
action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
else:
logging.info(
"No policy or teleoperator provided, skipping action generation."
@@ -222,14 +247,7 @@ def record_loop(
dataset.add_frame(frame, task=single_task)
if display_data:
for obs, val in observation.items():
if isinstance(val, float):
rr.log(f"observation.{obs}", rr.Scalar(val))
elif isinstance(val, np.ndarray):
rr.log(f"observation.{obs}", rr.Image(val), static=True)
for act, val in action.items():
if isinstance(val, float):
rr.log(f"action.{act}", rr.Scalar(val))
log_rerun_data(observation, action)
dt_s = time.perf_counter() - start_loop_t
busy_wait(1 / fps - dt_s)

View File

@@ -22,7 +22,6 @@ from typing import Any, Dict, Optional, Tuple
import cv2
import numpy as np
import torch
import zmq
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
@@ -195,26 +194,23 @@ class LeKiwiClient(Robot):
self, observation: Dict[str, Any]
) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]:
"""Extracts frames, and state from the parsed observation."""
flat_state = {key: value for key, value in observation.items() if key in self._state_ft}
state_vec = np.array(
[flat_state.get(k, 0.0) for k in self._state_order],
dtype=np.float32,
)
flat_state = {key: observation.get(key, 0.0) for key in self._state_order}
state_vec = np.array([flat_state[key] for key in self._state_order], dtype=np.float32)
obs_dict: Dict[str, Any] = {**flat_state, "observation.state": state_vec}
# Decode images
image_observation = {
f"observation.images.{key}": value
for key, value in observation.items()
if key in self._cameras_ft
}
current_frames: Dict[str, np.ndarray] = {}
for cam_name, image_b64 in image_observation.items():
for cam_name, image_b64 in observation.items():
if cam_name not in self._cameras_ft:
continue
frame = self._decode_image_from_b64(image_b64)
if frame is not None:
current_frames[cam_name] = frame
return current_frames, {"observation.state": state_vec}
return current_frames, obs_dict
def _get_data(self) -> Tuple[Dict[str, np.ndarray], Dict[str, Any], Dict[str, Any]]:
"""
@@ -267,7 +263,7 @@ class LeKiwiClient(Robot):
if frame is None:
logging.warning("Frame is None")
frame = np.zeros((640, 480, 3), dtype=np.uint8)
obs_dict[cam_name] = torch.from_numpy(frame)
obs_dict[cam_name] = frame
return obs_dict
@@ -327,7 +323,10 @@ class LeKiwiClient(Robot):
# TODO(Steven): Remove the np conversion when it is possible to record a non-numpy array value
actions = np.array([action.get(k, 0.0) for k in self._state_order], dtype=np.float32)
return {"action": actions}
action_sent = {key: actions[i] for i, key in enumerate(self._state_order)}
action_sent["action"] = actions
return action_sent
def disconnect(self):
"""Cleans ZMQ comms"""

View File

@@ -36,7 +36,6 @@ from dataclasses import asdict, dataclass
from pprint import pformat
import draccus
import numpy as np
import rerun as rr
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
@@ -60,11 +59,12 @@ from lerobot.teleoperators import ( # noqa: F401
)
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.utils import init_logging, move_cursor_up
from lerobot.utils.visualization_utils import _init_rerun
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
@dataclass
class TeleoperateConfig:
# TODO: pepijn, steven: if more robots require multiple teleoperators (like lekiwi) its good to make this possibele in teleop.py and record.py with List[Teleoperator]
teleop: TeleoperatorConfig
robot: RobotConfig
# Limit the maximum frames per second.
@@ -84,14 +84,7 @@ def teleop_loop(
action = teleop.get_action()
if display_data:
observation = robot.get_observation()
for obs, val in observation.items():
if isinstance(val, float):
rr.log(f"observation_{obs}", rr.Scalar(val))
elif isinstance(val, np.ndarray):
rr.log(f"observation_{obs}", rr.Image(val), static=True)
for act, val in action.items():
if isinstance(val, float):
rr.log(f"action_{act}", rr.Scalar(val))
log_rerun_data(observation, action)
robot.send_action(action)
dt_s = time.perf_counter() - loop_start

View File

@@ -13,7 +13,9 @@
# limitations under the License.
import os
from typing import Any
import numpy as np
import rerun as rr
@@ -24,3 +26,21 @@ def _init_rerun(session_name: str = "lerobot_control_loop") -> None:
rr.init(session_name)
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")
rr.spawn(memory_limit=memory_limit)
def log_rerun_data(observation: dict[str | Any], action: dict[str | Any]):
for obs, val in observation.items():
if isinstance(val, float):
rr.log(f"observation.{obs}", rr.Scalar(val))
elif isinstance(val, np.ndarray):
if val.ndim == 1:
for i, v in enumerate(val):
rr.log(f"observation.{obs}_{i}", rr.Scalar(float(v)))
else:
rr.log(f"observation.{obs}", rr.Image(val), static=True)
for act, val in action.items():
if isinstance(val, float):
rr.log(f"action.{act}", rr.Scalar(val))
elif isinstance(val, np.ndarray):
for i, v in enumerate(val):
rr.log(f"action.{act}_{i}", rr.Scalar(float(v)))