Refactor LeKiwi (#1136)
Co-authored-by: Simon Alibert <simon.alibert@huggingface.co> Co-authored-by: Steven Palma <steven.palma@huggingface.co> Co-authored-by: Steven Palma <imstevenpmwork@ieee.org> Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
@@ -16,9 +16,12 @@
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from itertools import chain
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from lerobot.common.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
@@ -71,34 +74,35 @@ class LeKiwi(Robot):
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
@property
|
||||
def state_feature(self) -> dict:
|
||||
state_ft = {
|
||||
"arm_shoulder_pan": {"dtype": "float32"},
|
||||
"arm_shoulder_lift": {"dtype": "float32"},
|
||||
"arm_elbow_flex": {"dtype": "float32"},
|
||||
"arm_wrist_flex": {"dtype": "float32"},
|
||||
"arm_wrist_roll": {"dtype": "float32"},
|
||||
"arm_gripper": {"dtype": "float32"},
|
||||
"base_left_wheel": {"dtype": "float32"},
|
||||
"base_right_wheel": {"dtype": "float32"},
|
||||
"base_back_wheel": {"dtype": "float32"},
|
||||
def _state_ft(self) -> dict[str, type]:
|
||||
return dict.fromkeys(
|
||||
(
|
||||
"arm_shoulder_pan.pos",
|
||||
"arm_shoulder_lift.pos",
|
||||
"arm_elbow_flex.pos",
|
||||
"arm_wrist_flex.pos",
|
||||
"arm_wrist_roll.pos",
|
||||
"arm_gripper.pos",
|
||||
"x.vel",
|
||||
"y.vel",
|
||||
"theta.vel",
|
||||
),
|
||||
float,
|
||||
)
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
return state_ft
|
||||
|
||||
@property
|
||||
def action_feature(self) -> dict:
|
||||
return self.state_feature
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._state_ft, **self._cameras_ft}
|
||||
|
||||
@property
|
||||
def camera_features(self) -> dict[str, dict]:
|
||||
cam_ft = {}
|
||||
for cam_key, cam in self.cameras.items():
|
||||
cam_ft[cam_key] = {
|
||||
"shape": (cam.height, cam.width, cam.channels),
|
||||
"names": ["height", "width", "channels"],
|
||||
"info": None,
|
||||
}
|
||||
return cam_ft
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._state_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
@@ -189,6 +193,142 @@ class LeKiwi(Robot):
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@staticmethod
|
||||
def _degps_to_raw(degps: float) -> int:
|
||||
steps_per_deg = 4096.0 / 360.0
|
||||
speed_in_steps = degps * steps_per_deg
|
||||
speed_int = int(round(speed_in_steps))
|
||||
# Cap the value to fit within signed 16-bit range (-32768 to 32767)
|
||||
if speed_int > 0x7FFF:
|
||||
speed_int = 0x7FFF # 32767 -> maximum positive value
|
||||
elif speed_int < -0x8000:
|
||||
speed_int = -0x8000 # -32768 -> minimum negative value
|
||||
return speed_int
|
||||
|
||||
@staticmethod
|
||||
def _raw_to_degps(raw_speed: int) -> float:
|
||||
steps_per_deg = 4096.0 / 360.0
|
||||
magnitude = raw_speed
|
||||
degps = magnitude / steps_per_deg
|
||||
return degps
|
||||
|
||||
def _body_to_wheel_raw(
|
||||
self,
|
||||
x: float,
|
||||
y: float,
|
||||
theta: float,
|
||||
wheel_radius: float = 0.05,
|
||||
base_radius: float = 0.125,
|
||||
max_raw: int = 3000,
|
||||
) -> dict:
|
||||
"""
|
||||
Convert desired body-frame velocities into wheel raw commands.
|
||||
|
||||
Parameters:
|
||||
x_cmd : Linear velocity in x (m/s).
|
||||
y_cmd : Linear velocity in y (m/s).
|
||||
theta_cmd : Rotational velocity (deg/s).
|
||||
wheel_radius: Radius of each wheel (meters).
|
||||
base_radius : Distance from the center of rotation to each wheel (meters).
|
||||
max_raw : Maximum allowed raw command (ticks) per wheel.
|
||||
|
||||
Returns:
|
||||
A dictionary with wheel raw commands:
|
||||
{"base_left_wheel": value, "base_back_wheel": value, "base_right_wheel": value}.
|
||||
|
||||
Notes:
|
||||
- Internally, the method converts theta_cmd to rad/s for the kinematics.
|
||||
- The raw command is computed from the wheels angular speed in deg/s
|
||||
using _degps_to_raw(). If any command exceeds max_raw, all commands
|
||||
are scaled down proportionally.
|
||||
"""
|
||||
# Convert rotational velocity from deg/s to rad/s.
|
||||
theta_rad = theta * (np.pi / 180.0)
|
||||
# Create the body velocity vector [x, y, theta_rad].
|
||||
velocity_vector = np.array([x, y, theta_rad])
|
||||
|
||||
# Define the wheel mounting angles with a -90° offset.
|
||||
angles = np.radians(np.array([240, 120, 0]) - 90)
|
||||
# Build the kinematic matrix: each row maps body velocities to a wheel’s linear speed.
|
||||
# The third column (base_radius) accounts for the effect of rotation.
|
||||
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
|
||||
|
||||
# Compute each wheel’s linear speed (m/s) and then its angular speed (rad/s).
|
||||
wheel_linear_speeds = m.dot(velocity_vector)
|
||||
wheel_angular_speeds = wheel_linear_speeds / wheel_radius
|
||||
|
||||
# Convert wheel angular speeds from rad/s to deg/s.
|
||||
wheel_degps = wheel_angular_speeds * (180.0 / np.pi)
|
||||
|
||||
# Scaling
|
||||
steps_per_deg = 4096.0 / 360.0
|
||||
raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps]
|
||||
max_raw_computed = max(raw_floats)
|
||||
if max_raw_computed > max_raw:
|
||||
scale = max_raw / max_raw_computed
|
||||
wheel_degps = wheel_degps * scale
|
||||
|
||||
# Convert each wheel’s angular speed (deg/s) to a raw integer.
|
||||
wheel_raw = [self._degps_to_raw(deg) for deg in wheel_degps]
|
||||
|
||||
return {
|
||||
"base_left_wheel": wheel_raw[0],
|
||||
"base_back_wheel": wheel_raw[1],
|
||||
"base_right_wheel": wheel_raw[2],
|
||||
}
|
||||
|
||||
def _wheel_raw_to_body(
|
||||
self,
|
||||
left_wheel_speed,
|
||||
back_wheel_speed,
|
||||
right_wheel_speed,
|
||||
wheel_radius: float = 0.05,
|
||||
base_radius: float = 0.125,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Convert wheel raw command feedback back into body-frame velocities.
|
||||
|
||||
Parameters:
|
||||
wheel_raw : Vector with raw wheel commands ("base_left_wheel", "base_back_wheel", "base_right_wheel").
|
||||
wheel_radius: Radius of each wheel (meters).
|
||||
base_radius : Distance from the robot center to each wheel (meters).
|
||||
|
||||
Returns:
|
||||
A dict (x_cmd, y_cmd, theta_cmd) where:
|
||||
OBS_STATE.x_cmd : Linear velocity in x (m/s).
|
||||
OBS_STATE.y_cmd : Linear velocity in y (m/s).
|
||||
OBS_STATE.theta_cmd : Rotational velocity in deg/s.
|
||||
"""
|
||||
|
||||
# Convert each raw command back to an angular speed in deg/s.
|
||||
wheel_degps = np.array(
|
||||
[
|
||||
self._raw_to_degps(left_wheel_speed),
|
||||
self._raw_to_degps(back_wheel_speed),
|
||||
self._raw_to_degps(right_wheel_speed),
|
||||
]
|
||||
)
|
||||
|
||||
# Convert from deg/s to rad/s.
|
||||
wheel_radps = wheel_degps * (np.pi / 180.0)
|
||||
# Compute each wheel’s linear speed (m/s) from its angular speed.
|
||||
wheel_linear_speeds = wheel_radps * wheel_radius
|
||||
|
||||
# Define the wheel mounting angles with a -90° offset.
|
||||
angles = np.radians(np.array([240, 120, 0]) - 90)
|
||||
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
|
||||
|
||||
# Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds.
|
||||
m_inv = np.linalg.inv(m)
|
||||
velocity_vector = m_inv.dot(wheel_linear_speeds)
|
||||
x, y, theta_rad = velocity_vector
|
||||
theta = theta_rad * (180.0 / np.pi)
|
||||
return {
|
||||
"x.vel": x,
|
||||
"y.vel": y,
|
||||
"theta.vel": theta,
|
||||
} # m/s and deg/s
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
@@ -196,9 +336,19 @@ class LeKiwi(Robot):
|
||||
# Read actuators position for arm and vel for base
|
||||
start = time.perf_counter()
|
||||
arm_pos = self.bus.sync_read("Present_Position", self.arm_motors)
|
||||
base_vel = self.bus.sync_read("Present_Velocity", self.base_motors)
|
||||
obs_dict = {**arm_pos, **base_vel}
|
||||
obs_dict = {f"{OBS_STATE}." + key: value for key, value in obs_dict.items()}
|
||||
base_wheel_vel = self.bus.sync_read("Present_Velocity", self.base_motors)
|
||||
|
||||
base_vel = self._wheel_raw_to_body(
|
||||
base_wheel_vel["base_left_wheel"],
|
||||
base_wheel_vel["base_back_wheel"],
|
||||
base_wheel_vel["base_right_wheel"],
|
||||
)
|
||||
|
||||
arm_state = {f"{k}.pos": v for k, v in arm_pos.items()}
|
||||
|
||||
flat_states = {**arm_state, **base_vel}
|
||||
|
||||
obs_dict = {f"{OBS_STATE}": flat_states}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
@@ -227,8 +377,12 @@ class LeKiwi(Robot):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
arm_goal_pos = {k: v for k, v in action.items() if k in self.arm_motors}
|
||||
base_goal_vel = {k: v for k, v in action.items() if k in self.base_motors}
|
||||
arm_goal_pos = {k: v for k, v in action.items() if k.endswith(".pos")}
|
||||
base_goal_vel = {k: v for k, v in action.items() if k.endswith(".vel")}
|
||||
|
||||
base_wheel_goal_vel = self._body_to_wheel_raw(
|
||||
base_goal_vel["x.vel"], base_goal_vel["y.vel"], base_goal_vel["theta.vel"]
|
||||
)
|
||||
|
||||
# Cap goal position when too far away from present position.
|
||||
# /!\ Slower fps expected due to reading from the follower.
|
||||
@@ -239,8 +393,9 @@ class LeKiwi(Robot):
|
||||
arm_goal_pos = arm_safe_goal_pos
|
||||
|
||||
# Send goal position to the actuators
|
||||
self.bus.sync_write("Goal_Position", arm_goal_pos)
|
||||
self.bus.sync_write("Goal_Velocity", base_goal_vel)
|
||||
arm_goal_pos_raw = {k.replace(".pos", ""): v for k, v in arm_goal_pos.items()}
|
||||
self.bus.sync_write("Goal_Position", arm_goal_pos_raw)
|
||||
self.bus.sync_write("Goal_Velocity", base_wheel_goal_vel)
|
||||
|
||||
return {**arm_goal_pos, **base_goal_vel}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user