forked from tangger/lerobot
Match motor names with ids lekiwi (#1261)
This commit is contained in:
@@ -23,7 +23,7 @@ obs_features = hw_to_dataset_features(robot.observation_features, "observation")
|
||||
dataset_features = {**action_features, **obs_features}
|
||||
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id="user/lekiwi" + str(int(time.time())),
|
||||
repo_id="pepijn223/lekiwi" + str(int(time.time())),
|
||||
fps=10,
|
||||
features=dataset_features,
|
||||
robot_type=robot.name,
|
||||
@@ -36,7 +36,7 @@ robot.connect()
|
||||
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
|
||||
exit()
|
||||
|
||||
print("Starting LeKiwi teleoperation")
|
||||
print("Starting LeKiwi recording")
|
||||
i = 0
|
||||
while i < NB_CYCLES_CLIENT_CONNECTION:
|
||||
arm_action = leader_arm.get_action()
|
||||
|
||||
@@ -20,6 +20,17 @@ from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfi
|
||||
from ..config import RobotConfig
|
||||
|
||||
|
||||
def lekiwi_cameras_config() -> dict[str, CameraConfig]:
|
||||
return {
|
||||
"front": OpenCVCameraConfig(
|
||||
index_or_path="/dev/video0", fps=30, width=640, height=480, rotation=Cv2Rotation.ROTATE_180
|
||||
),
|
||||
"wrist": OpenCVCameraConfig(
|
||||
index_or_path="/dev/video2", fps=30, width=480, height=640, rotation=Cv2Rotation.ROTATE_90
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("lekiwi")
|
||||
@dataclass
|
||||
class LeKiwiConfig(RobotConfig):
|
||||
@@ -32,14 +43,7 @@ class LeKiwiConfig(RobotConfig):
|
||||
# the number of motors in your follower arms.
|
||||
max_relative_target: int | None = None
|
||||
|
||||
cameras: dict[str, CameraConfig] = field(
|
||||
default_factory=lambda: {
|
||||
"front": OpenCVCameraConfig(index_or_path="/dev/video0", fps=30, width=640, height=480),
|
||||
"wrist": OpenCVCameraConfig(
|
||||
index_or_path="/dev/video2", fps=30, width=640, height=480, rotation=Cv2Rotation.ROTATE_180
|
||||
),
|
||||
}
|
||||
)
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=lekiwi_cameras_config)
|
||||
|
||||
# Set to `True` for backward compatibility with previous policies/dataset
|
||||
use_degrees: bool = False
|
||||
@@ -86,5 +90,7 @@ class LeKiwiClientConfig(RobotConfig):
|
||||
}
|
||||
)
|
||||
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=lekiwi_cameras_config)
|
||||
|
||||
polling_timeout_ms: int = 15
|
||||
connect_timeout_s: int = 5
|
||||
|
||||
@@ -23,7 +23,6 @@ from typing import Any
|
||||
import numpy as np
|
||||
|
||||
from lerobot.common.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.common.motors.feetech import (
|
||||
@@ -65,8 +64,8 @@ class LeKiwi(Robot):
|
||||
"arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
|
||||
# base
|
||||
"base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100),
|
||||
"base_right_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100),
|
||||
"base_back_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100),
|
||||
"base_back_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100),
|
||||
"base_right_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100),
|
||||
},
|
||||
calibration=self.calibration,
|
||||
)
|
||||
@@ -249,7 +248,7 @@ class LeKiwi(Robot):
|
||||
velocity_vector = np.array([x, y, theta_rad])
|
||||
|
||||
# Define the wheel mounting angles with a -90° offset.
|
||||
angles = np.radians(np.array([240, 120, 0]) - 90)
|
||||
angles = np.radians(np.array([240, 0, 120]) - 90)
|
||||
# Build the kinematic matrix: each row maps body velocities to a wheel’s linear speed.
|
||||
# The third column (base_radius) accounts for the effect of rotation.
|
||||
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
|
||||
@@ -295,10 +294,7 @@ class LeKiwi(Robot):
|
||||
base_radius : Distance from the robot center to each wheel (meters).
|
||||
|
||||
Returns:
|
||||
A dict (x_cmd, y_cmd, theta_cmd) where:
|
||||
OBS_STATE.x_cmd : Linear velocity in x (m/s).
|
||||
OBS_STATE.y_cmd : Linear velocity in y (m/s).
|
||||
OBS_STATE.theta_cmd : Rotational velocity in deg/s.
|
||||
A dict (x.vel, y.vel, theta.vel) all in m/s
|
||||
"""
|
||||
|
||||
# Convert each raw command back to an angular speed in deg/s.
|
||||
@@ -316,7 +312,7 @@ class LeKiwi(Robot):
|
||||
wheel_linear_speeds = wheel_radps * wheel_radius
|
||||
|
||||
# Define the wheel mounting angles with a -90° offset.
|
||||
angles = np.radians(np.array([240, 120, 0]) - 90)
|
||||
angles = np.radians(np.array([240, 0, 120]) - 90)
|
||||
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
|
||||
|
||||
# Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds.
|
||||
@@ -347,16 +343,15 @@ class LeKiwi(Robot):
|
||||
|
||||
arm_state = {f"{k}.pos": v for k, v in arm_pos.items()}
|
||||
|
||||
flat_states = {**arm_state, **base_vel}
|
||||
obs_dict = {**arm_state, **base_vel}
|
||||
|
||||
obs_dict = {f"{OBS_STATE}": flat_states}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ import numpy as np
|
||||
import torch
|
||||
import zmq
|
||||
|
||||
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
|
||||
from ..robot import Robot
|
||||
@@ -92,11 +91,8 @@ class LeKiwiClient(Robot):
|
||||
return tuple(self._state_ft.keys())
|
||||
|
||||
@cached_property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
"front": (480, 640, 3),
|
||||
"wrist": (640, 480, 3),
|
||||
}
|
||||
def _cameras_ft(self) -> dict[str, tuple[int, int, int]]:
|
||||
return {name: (cfg.height, cfg.width, 3) for name, cfg in self.config.cameras.items()}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -199,7 +195,7 @@ class LeKiwiClient(Robot):
|
||||
self, observation: Dict[str, Any]
|
||||
) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]:
|
||||
"""Extracts frames, and state from the parsed observation."""
|
||||
flat_state = observation[OBS_STATE]
|
||||
flat_state = {key: value for key, value in observation.items() if key in self._state_ft}
|
||||
|
||||
state_vec = np.array(
|
||||
[flat_state.get(k, 0.0) for k in self._state_order],
|
||||
@@ -207,7 +203,11 @@ class LeKiwiClient(Robot):
|
||||
)
|
||||
|
||||
# Decode images
|
||||
image_observation = {k: v for k, v in observation.items() if k.startswith(OBS_IMAGES)}
|
||||
image_observation = {
|
||||
f"observation.images.{key}": value
|
||||
for key, value in observation.items()
|
||||
if key in self._cameras_ft
|
||||
}
|
||||
current_frames: Dict[str, np.ndarray] = {}
|
||||
for cam_name, image_b64 in image_observation.items():
|
||||
frame = self._decode_image_from_b64(image_b64)
|
||||
|
||||
@@ -22,8 +22,6 @@ import time
|
||||
import cv2
|
||||
import zmq
|
||||
|
||||
from lerobot.common.constants import OBS_IMAGES
|
||||
|
||||
from .config_lekiwi import LeKiwiConfig, LeKiwiHostConfig
|
||||
from .lekiwi import LeKiwi
|
||||
|
||||
@@ -95,12 +93,12 @@ def main():
|
||||
# Encode ndarrays to base64 strings
|
||||
for cam_key, _ in robot.cameras.items():
|
||||
ret, buffer = cv2.imencode(
|
||||
".jpg", last_observation[f"{OBS_IMAGES}.{cam_key}"], [int(cv2.IMWRITE_JPEG_QUALITY), 90]
|
||||
".jpg", last_observation[cam_key], [int(cv2.IMWRITE_JPEG_QUALITY), 90]
|
||||
)
|
||||
if ret:
|
||||
last_observation[f"{OBS_IMAGES}.{cam_key}"] = base64.b64encode(buffer).decode("utf-8")
|
||||
last_observation[cam_key] = base64.b64encode(buffer).decode("utf-8")
|
||||
else:
|
||||
last_observation[f"{OBS_IMAGES}.{cam_key}"] = ""
|
||||
last_observation[cam_key] = ""
|
||||
|
||||
# Send the observation to the remote agent
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user