Refactor kinematics and switch to using placo (#1322)

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Co-authored-by: Adil Zouitine <adilzouitinegm@gmail.com>
Co-authored-by: leo-berte <leonardo.bertelli96@gmail.com>
This commit is contained in:
Michel Aractingi
2025-07-02 15:20:04 +02:00
committed by GitHub
parent 6047bbee10
commit 596e9050bd
10 changed files with 181 additions and 522 deletions

View File

@@ -24,6 +24,7 @@ This guide provides step-by-step instructions for training a robot policy using
- A gamepad (recommended) or keyboard to control the robot
- A Nvidia GPU
- A real robot with a follower and leader arm (optional if you use the keyboard or the gamepad)
- A URDF file for the robot for the kinematics package (check `lerobot/common/model/kinematics.py`)
## What kind of tasks can I train?

View File

@@ -68,7 +68,6 @@ dependencies = [
"pyserial>=3.5",
"pyzmq>=26.2.1",
"rerun-sdk>=0.21.0",
"scipy>=1.14.0",
"termcolor>=2.4.0",
"torch>=2.2.1",
"torchcodec>=0.2.1; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')",
@@ -87,6 +86,7 @@ dora = [
dynamixel = ["dynamixel-sdk>=3.7.31"]
feetech = ["feetech-servo-sdk>=1.0.0"]
gamepad = ["pygame>=2.5.1", "hidapi>=0.14.0"]
kinematics = ["placo>=0.9.6"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'",
@@ -100,7 +100,7 @@ stretch = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'"
]
test = ["pytest>=8.1.0", "pytest-timeout>=2.4.0", "pytest-cov>=5.0.0", "pyserial>=3.5", "mock-serial>=0.0.1 ; sys_platform != 'win32'"]
hilserl = ["transformers>=4.50.3", "gym-hil>=0.1.8", "protobuf>=5.29.3", "grpcio==1.71.0"]
hilserl = ["transformers>=4.50.3", "gym-hil>=0.1.9", "protobuf>=5.29.3", "grpcio==1.71.0", "placo>=0.9.6"]
umi = ["imagecodecs>=2024.1.1"]
video_benchmark = ["scikit-image>=0.23.2", "pandas>=2.2.2"]
xarm = ["gym-xarm>=0.1.1 ; python_version < '4.0'"]

View File

@@ -12,472 +12,117 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from numpy.typing import NDArray
from scipy.spatial.transform import Rotation
def skew_symmetric(w: NDArray[np.float32]) -> NDArray[np.float32]:
"""Creates the skew-symmetric matrix from a 3D vector."""
return np.array([[0, -w[2], w[1]], [w[2], 0, -w[0]], [-w[1], w[0], 0]])
def rodrigues_rotation(w: NDArray[np.float32], theta: float) -> NDArray[np.float32]:
"""Computes the rotation matrix using Rodrigues' formula."""
w_hat = skew_symmetric(w)
return np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat
def screw_axis_to_transform(s: NDArray[np.float32], theta: float) -> NDArray[np.float32]:
"""Converts a screw axis to a 4x4 transformation matrix."""
screw_axis_rot = s[:3]
screw_axis_trans = s[3:]
# Pure translation
if np.allclose(screw_axis_rot, 0) and np.linalg.norm(screw_axis_trans) == 1:
transform = np.eye(4)
transform[:3, 3] = screw_axis_trans * theta
# Rotation (and potentially translation)
elif np.linalg.norm(screw_axis_rot) == 1:
w_hat = skew_symmetric(screw_axis_rot)
rot_mat = np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat
t = (
np.eye(3) * theta + (1 - np.cos(theta)) * w_hat + (theta - np.sin(theta)) * w_hat @ w_hat
) @ screw_axis_trans
transform = np.eye(4)
transform[:3, :3] = rot_mat
transform[:3, 3] = t
else:
raise ValueError("Invalid screw axis parameters")
return transform
def pose_difference_se3(pose1: NDArray[np.float32], pose2: NDArray[np.float32]) -> NDArray[np.float32]:
"""
Calculates the SE(3) difference between two 4x4 homogeneous transformation matrices.
SE(3) (Special Euclidean Group) represents rigid body transformations in 3D space,
combining rotation (SO(3)) and translation.
Each 4x4 matrix has the following structure:
[R11 R12 R13 tx]
[R21 R22 R23 ty]
[R31 R32 R33 tz]
[ 0 0 0 1]
where R is the 3x3 rotation matrix and [tx,ty,tz] is the translation vector.
Args:
pose1: A 4x4 numpy array representing the first pose.
pose2: A 4x4 numpy array representing the second pose.
Returns:
A 6D numpy array concatenating translation and rotation differences.
First 3 elements are the translational difference (position).
Last 3 elements are the rotational difference in axis-angle representation.
"""
rot1 = pose1[:3, :3]
rot2 = pose2[:3, :3]
translation_diff = pose1[:3, 3] - pose2[:3, 3]
# Calculate rotational difference using scipy's Rotation library
rot_diff = Rotation.from_matrix(rot1 @ rot2.T)
rotation_diff = rot_diff.as_rotvec() # Axis-angle representation
return np.concatenate([translation_diff, rotation_diff])
def se3_error(target_pose: NDArray[np.float32], current_pose: NDArray[np.float32]) -> NDArray[np.float32]:
pos_error = target_pose[:3, 3] - current_pose[:3, 3]
rot_target = target_pose[:3, :3]
rot_current = current_pose[:3, :3]
rot_error_mat = rot_target @ rot_current.T
rot_error = Rotation.from_matrix(rot_error_mat).as_rotvec()
return np.concatenate([pos_error, rot_error])
class RobotKinematics:
"""Robot kinematics class supporting multiple robot models."""
"""Robot kinematics using placo library for forward and inverse kinematics."""
# Robot measurements dictionary
ROBOT_MEASUREMENTS = {
"koch": {
"gripper": [0.239, -0.001, 0.024],
"wrist": [0.209, 0, 0.024],
"forearm": [0.108, 0, 0.02],
"humerus": [0, 0, 0.036],
"shoulder": [0, 0, 0],
"base": [0, 0, 0.02],
},
"moss": {
"gripper": [0.246, 0.013, 0.111],
"wrist": [0.245, 0.002, 0.064],
"forearm": [0.122, 0, 0.064],
"humerus": [0.001, 0.001, 0.063],
"shoulder": [0, 0, 0],
"base": [0, 0, 0.02],
},
"so_old_calibration": {
"gripper": [0.320, 0, 0.050],
"wrist": [0.278, 0, 0.050],
"forearm": [0.143, 0, 0.044],
"humerus": [0.031, 0, 0.072],
"shoulder": [0, 0, 0],
"base": [0, 0, 0.02],
},
"so_new_calibration": {
"gripper": [0.33, 0.0, 0.285],
"wrist": [0.30, 0.0, 0.267],
"forearm": [0.25, 0.0, 0.266],
"humerus": [0.06, 0.0, 0.264],
"shoulder": [0.0, 0.0, 0.238],
"base": [0.0, 0.0, 0.12],
},
}
def __init__(self, robot_type: str = "so100"):
"""Initialize kinematics for the specified robot type.
Args:
robot_type: String specifying the robot model ("koch", "so100", or "moss")
"""
if robot_type not in self.ROBOT_MEASUREMENTS:
raise ValueError(
f"Unknown robot type: {robot_type}. Available types: {list(self.ROBOT_MEASUREMENTS.keys())}"
)
self.robot_type = robot_type
self.measurements = self.ROBOT_MEASUREMENTS[robot_type]
# Initialize all transformation matrices and screw axes
self._setup_transforms()
def _create_translation_matrix(
self, x: float = 0.0, y: float = 0.0, z: float = 0.0
) -> NDArray[np.float32]:
"""Create a 4x4 translation matrix."""
return np.array([[1, 0, 0, x], [0, 1, 0, y], [0, 0, 1, z], [0, 0, 0, 1]])
def _setup_transforms(self):
"""Setup all transformation matrices and screw axes for the robot."""
# Set up rotation matrices (constant across robot types)
# Gripper orientation
self.gripper_X0 = np.array(
[
[1, 0, 0, 0],
[0, 0, 1, 0],
[0, -1, 0, 0],
[0, 0, 0, 1],
],
dtype=np.float32,
)
# Wrist orientation
self.wrist_X0 = np.array(
[
[0, -1, 0, 0],
[1, 0, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1],
],
dtype=np.float32,
)
# Base orientation
self.base_X0 = np.array(
[
[0, 0, 1, 0],
[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 0, 1],
],
dtype=np.float32,
)
# Gripper
# Screw axis of gripper frame wrt base frame
self.S_BG = np.array(
[
1,
0,
0,
0,
self.measurements["gripper"][2],
-self.measurements["gripper"][1],
],
dtype=np.float32,
)
# Gripper origin to centroid transform
self.X_GoGc = self._create_translation_matrix(x=0.07)
# Gripper origin to tip transform
self.X_GoGt = self._create_translation_matrix(x=0.12)
# 0-position gripper frame pose wrt base
self.X_BoGo = self._create_translation_matrix(
x=self.measurements["gripper"][0],
y=self.measurements["gripper"][1],
z=self.measurements["gripper"][2],
)
# Wrist
# Screw axis of wrist frame wrt base frame
self.S_BR = np.array(
[0, 1, 0, -self.measurements["wrist"][2], 0, self.measurements["wrist"][0]], dtype=np.float32
)
# 0-position origin to centroid transform
self.X_RoRc = self._create_translation_matrix(x=0.0035, y=-0.002)
# 0-position wrist frame pose wrt base
self.X_BR = self._create_translation_matrix(
x=self.measurements["wrist"][0],
y=self.measurements["wrist"][1],
z=self.measurements["wrist"][2],
)
# Forearm
# Screw axis of forearm frame wrt base frame
self.S_BF = np.array(
[
0,
1,
0,
-self.measurements["forearm"][2],
0,
self.measurements["forearm"][0],
],
dtype=np.float32,
)
# Forearm origin + centroid transform
self.X_ForearmFc = self._create_translation_matrix(x=0.036)
# 0-position forearm frame pose wrt base
self.X_BF = self._create_translation_matrix(
x=self.measurements["forearm"][0],
y=self.measurements["forearm"][1],
z=self.measurements["forearm"][2],
)
# Humerus
# Screw axis of humerus frame wrt base frame
self.S_BH = np.array(
[
0,
-1,
0,
self.measurements["humerus"][2],
0,
-self.measurements["humerus"][0],
],
dtype=np.float32,
)
# Humerus origin to centroid transform
self.X_HoHc = self._create_translation_matrix(x=0.0475)
# 0-position humerus frame pose wrt base
self.X_BH = self._create_translation_matrix(
x=self.measurements["humerus"][0],
y=self.measurements["humerus"][1],
z=self.measurements["humerus"][2],
)
# Shoulder
# Screw axis of shoulder frame wrt Base frame
self.S_BS = np.array([0, 0, -1, 0, 0, 0], dtype=np.float32)
# Shoulder origin to centroid transform
self.X_SoSc = self._create_translation_matrix(x=-0.017, z=0.0235)
# 0-position shoulder frame pose wrt base
self.X_BS = self._create_translation_matrix(
x=self.measurements["shoulder"][0],
y=self.measurements["shoulder"][1],
z=self.measurements["shoulder"][2],
)
# Base
# Base origin to centroid transform
self.X_BoBc = self._create_translation_matrix(y=0.015)
# World to base transform
self.X_WoBo = self._create_translation_matrix(
x=self.measurements["base"][0],
y=self.measurements["base"][1],
z=self.measurements["base"][2],
)
# Pre-compute gripper post-multiplication matrix
self._fk_gripper_post = self.X_GoGc @ self.X_BoGo @ self.gripper_X0
def forward_kinematics(
def __init__(
self,
robot_pos_deg: NDArray[np.float32],
frame: str = "gripper_tip",
) -> NDArray[np.float32]:
"""Generic forward kinematics.
Args:
robot_pos_deg: Joint positions in degrees. Can be ``None`` when
computing the *base* frame as it does not depend on joint
angles.
frame: Target frame. One of
``{"base", "shoulder", "humerus", "forearm", "wrist", "gripper", "gripper_tip"}``.
Returns
-------
NDArray[np.float32]
4×4 homogeneous transformation matrix of the requested frame
expressed in the world coordinate system.
urdf_path: str,
target_frame_name: str = "gripper_frame_link",
joint_names: list[str] = None,
):
"""
frame = frame.lower()
if frame not in {
"base",
"shoulder",
"humerus",
"forearm",
"wrist",
"gripper",
"gripper_tip",
}:
raise ValueError(
f"Unknown frame '{frame}'. Valid options are base, shoulder, humerus, forearm, wrist, gripper, gripper_tip."
)
# Base frame does not rely on joint angles.
if frame == "base":
return self.X_WoBo @ self.X_BoBc @ self.base_X0
robot_pos_rad = robot_pos_deg / 180 * np.pi
# Extract joint angles (note the sign convention for shoulder lift).
theta_shoulder_pan = robot_pos_rad[0]
theta_shoulder_lift = -robot_pos_rad[1]
theta_elbow_flex = robot_pos_rad[2]
theta_wrist_flex = robot_pos_rad[3]
theta_wrist_roll = robot_pos_rad[4]
# Start with the world-to-base transform; incrementally add successive links.
transformation_matrix = self.X_WoBo @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan)
if frame == "shoulder":
return transformation_matrix @ self.X_SoSc @ self.X_BS
transformation_matrix = transformation_matrix @ screw_axis_to_transform(
self.S_BH, theta_shoulder_lift
)
if frame == "humerus":
return transformation_matrix @ self.X_HoHc @ self.X_BH
transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BF, theta_elbow_flex)
if frame == "forearm":
return transformation_matrix @ self.X_ForearmFc @ self.X_BF
transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BR, theta_wrist_flex)
if frame == "wrist":
return transformation_matrix @ self.X_RoRc @ self.X_BR @ self.wrist_X0
transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BG, theta_wrist_roll)
if frame == "gripper":
return transformation_matrix @ self._fk_gripper_post
else: # frame == "gripper_tip"
return transformation_matrix @ self.X_GoGt @ self.X_BoGo @ self.gripper_X0
def compute_jacobian(
self, robot_pos_deg: NDArray[np.float32], frame: str = "gripper_tip"
) -> NDArray[np.float32]:
"""Finite differences to compute the Jacobian.
J(i, j) represents how the ith component of the end-effector's velocity changes wrt a small change
in the jth joint's velocity.
Initialize placo-based kinematics solver.
Args:
robot_pos_deg: Current joint positions in degrees
fk_func: Forward kinematics function to use (defaults to fk_gripper)
urdf_path: Path to the robot URDF file
target_frame_name: Name of the end-effector frame in the URDF
joint_names: List of joint names to use for the kinematics solver
"""
try:
import placo
except ImportError as e:
raise ImportError(
"placo is required for RobotKinematics. "
"Please install the optional dependencies of `kinematics` in the package."
) from e
self.robot = placo.RobotWrapper(urdf_path)
self.solver = placo.KinematicsSolver(self.robot)
self.solver.mask_fbase(True) # Fix the base
self.target_frame_name = target_frame_name
# Set joint names
self.joint_names = list(self.robot.joint_names()) if joint_names is None else joint_names
# Initialize frame task for IK
self.tip_frame = self.solver.add_frame_task(self.target_frame_name, np.eye(4))
def forward_kinematics(self, joint_pos_deg):
"""
Compute forward kinematics for given joint configuration given the target frame name in the constructor.
Args:
joint_pos_deg: Joint positions in degrees (numpy array)
Returns:
4x4 transformation matrix of the end-effector pose
"""
eps = 1e-8
jac = np.zeros(shape=(6, 5))
delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64)
for el_ix in range(len(robot_pos_deg[:-1])):
delta *= 0
delta[el_ix] = eps / 2
sdot = (
pose_difference_se3(
self.forward_kinematics(robot_pos_deg[:-1] + delta, frame),
self.forward_kinematics(robot_pos_deg[:-1] - delta, frame),
)
/ eps
)
jac[:, el_ix] = sdot
return jac
# Convert degrees to radians
joint_pos_rad = np.deg2rad(joint_pos_deg[: len(self.joint_names)])
def compute_positional_jacobian(
self, robot_pos_deg: NDArray[np.float32], frame: str = "gripper_tip"
) -> NDArray[np.float32]:
"""Finite differences to compute the positional Jacobian.
J(i, j) represents how the ith component of the end-effector's position changes wrt a small change
in the jth joint's velocity.
# Update joint positions in placo robot
for i, joint_name in enumerate(self.joint_names):
self.robot.set_joint(joint_name, joint_pos_rad[i])
Args:
robot_pos_deg: Current joint positions in degrees
fk_func: Forward kinematics function to use (defaults to fk_gripper)
# Update kinematics
self.robot.update_kinematics()
# Get the transformation matrix
return self.robot.get_T_world_frame(self.target_frame_name)
def inverse_kinematics(
self, current_joint_pos, desired_ee_pose, position_weight=1.0, orientation_weight=0.01
):
"""
eps = 1e-8
jac = np.zeros(shape=(3, 5))
delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64)
for el_ix in range(len(robot_pos_deg[:-1])):
delta *= 0
delta[el_ix] = eps / 2
sdot = (
self.forward_kinematics(robot_pos_deg[:-1] + delta, frame)[:3, 3]
- self.forward_kinematics(robot_pos_deg[:-1] - delta, frame)[:3, 3]
) / eps
jac[:, el_ix] = sdot
return jac
def ik(
self,
current_joint_pos: NDArray[np.float32],
desired_ee_pose: NDArray[np.float32],
position_only: bool = True,
frame: str = "gripper_tip",
max_iterations: int = 5,
learning_rate: float = 1,
) -> NDArray[np.float32]:
"""Inverse kinematics using gradient descent.
Compute inverse kinematics using placo solver.
Args:
current_joint_state: Initial joint positions in degrees
current_joint_pos: Current joint positions in degrees (used as initial guess)
desired_ee_pose: Target end-effector pose as a 4x4 transformation matrix
position_only: If True, only match end-effector position, not orientation
frame: Target frame. One of
``{"base", "shoulder", "humerus", "forearm", "wrist", "gripper", "gripper_tip"}``.
max_iterations: Maximum number of iterations to run
learning_rate: Learning rate for gradient descent
position_weight: Weight for position constraint in IK
orientation_weight: Weight for orientation constraint in IK, set to 0.0 to only constrain position
Returns:
Joint positions in degrees that achieve the desired end-effector pose
"""
# Do gradient descent.
current_joint_state = current_joint_pos.copy()
for _ in range(max_iterations):
current_ee_pose = self.forward_kinematics(current_joint_state, frame)
if not position_only:
error = se3_error(desired_ee_pose, current_ee_pose)
jac = self.compute_jacobian(current_joint_state, frame)
else:
error = desired_ee_pose[:3, 3] - current_ee_pose[:3, 3]
jac = self.compute_positional_jacobian(current_joint_state, frame)
delta_angles = np.linalg.pinv(jac) @ error
current_joint_state[:-1] += learning_rate * delta_angles
if np.linalg.norm(error) < 5e-3:
return current_joint_state
return current_joint_state
# Convert current joint positions to radians for initial guess
current_joint_rad = np.deg2rad(current_joint_pos[: len(self.joint_names)])
# Set current joint positions as initial guess
for i, joint_name in enumerate(self.joint_names):
self.robot.set_joint(joint_name, current_joint_rad[i])
# Update the target pose for the frame task
self.tip_frame.T_world_frame = desired_ee_pose
# Configure the task based on position_only flag
self.tip_frame.configure(self.target_frame_name, "soft", position_weight, orientation_weight)
# Solve IK
self.solver.solve(True)
self.robot.update_kinematics()
# Extract joint positions
joint_pos_rad = []
for joint_name in self.joint_names:
joint = self.robot.get_joint(joint_name)
joint_pos_rad.append(joint)
# Convert back to degrees
joint_pos_deg = np.rad2deg(joint_pos_rad)
# Preserve gripper position if present in current_joint_pos
if len(current_joint_pos) > len(self.joint_names):
result = np.zeros_like(current_joint_pos)
result[: len(self.joint_names)] = joint_pos_deg
result[len(self.joint_names) :] = current_joint_pos[len(self.joint_names) :]
return result
else:
return joint_pos_deg

View File

@@ -44,6 +44,14 @@ class SO100FollowerConfig(RobotConfig):
class SO100FollowerEndEffectorConfig(SO100FollowerConfig):
"""Configuration for the SO100FollowerEndEffector robot."""
# Path to URDF file for kinematics
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
urdf_path: str | None = None
# End-effector frame name in the URDF
target_frame_name: str = "gripper_frame_link"
# Default bounds for the end-effector position (in meters)
end_effector_bounds: dict[str, list[float]] = field(
default_factory=lambda: {

View File

@@ -30,7 +30,6 @@ from . import SO100Follower
from .config_so100_follower import SO100FollowerEndEffectorConfig
logger = logging.getLogger(__name__)
EE_FRAME = "gripper_tip"
class SO100FollowerEndEffector(SO100Follower):
@@ -64,7 +63,16 @@ class SO100FollowerEndEffector(SO100Follower):
self.config = config
# Initialize the kinematics module for the so100 robot
self.kinematics = RobotKinematics(robot_type="so_new_calibration")
if self.config.urdf_path is None:
raise ValueError(
"urdf_path must be provided in the configuration for end-effector control. "
"Please set urdf_path in your SO100FollowerEndEffectorConfig."
)
self.kinematics = RobotKinematics(
urdf_path=self.config.urdf_path,
target_frame_name=self.config.target_frame_name,
)
# Store the bounds for end-effector position
self.end_effector_bounds = self.config.end_effector_bounds
@@ -126,7 +134,7 @@ class SO100FollowerEndEffector(SO100Follower):
# Calculate current end-effector position using forward kinematics
if self.current_ee_pos is None:
self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos, frame=EE_FRAME)
self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos)
# Set desired end-effector position by adding delta
desired_ee_pos = np.eye(4)
@@ -142,11 +150,10 @@ class SO100FollowerEndEffector(SO100Follower):
)
# Compute inverse kinematics to get joint positions
target_joint_values_in_degrees = self.kinematics.ik(
self.current_joint_pos, desired_ee_pos, position_only=True, frame=EE_FRAME
target_joint_values_in_degrees = self.kinematics.inverse_kinematics(
self.current_joint_pos, desired_ee_pos
)
target_joint_values_in_degrees = np.clip(target_joint_values_in_degrees, -180.0, 180.0)
# Create joint space action dictionary
joint_action = {
f"{key}.pos": target_joint_values_in_degrees[i] for i, key in enumerate(self.bus.motors.keys())

View File

@@ -36,6 +36,7 @@ from dataclasses import dataclass
import draccus
import numpy as np
from lerobot.common.utils.robot_utils import busy_wait
from lerobot.model.kinematics import RobotKinematics
from lerobot.robots import ( # noqa: F401
RobotConfig,
@@ -76,12 +77,12 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
# Note to be compatible with the rest of the codebase,
# we are using the new calibration method for so101 and so100
robot_type = "so_new_calibration"
kinematics = RobotKinematics(robot_type=robot_type)
kinematics = RobotKinematics(cfg.robot.urdf_path, cfg.robot.target_frame_name)
# Initialize min/max values
observation = robot.get_observation()
joint_positions = np.array([observation[f"{key}.pos"] for key in robot.bus.motors])
ee_pos = kinematics.forward_kinematics(joint_positions, frame="gripper_tip")[:3, 3]
ee_pos = kinematics.forward_kinematics(joint_positions)[:3, 3]
max_pos = joint_positions.copy()
min_pos = joint_positions.copy()
@@ -94,7 +95,7 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
observation = robot.get_observation()
joint_positions = np.array([observation[f"{key}.pos"] for key in robot.bus.motors])
ee_pos = kinematics.forward_kinematics(joint_positions, frame="gripper_tip")[:3, 3]
ee_pos = kinematics.forward_kinematics(joint_positions)[:3, 3]
# Skip initial warmup period
if (time.perf_counter() - start_episode_t) < 5:
@@ -113,6 +114,8 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
print(f"Min joint pos position {np.round(min_pos, 4).tolist()}")
break
busy_wait(0.01)
if __name__ == "__main__":
find_joint_and_ee_bounds()

View File

@@ -21,8 +21,7 @@ from pathlib import Path
from typing import Dict, Tuple
import cv2
# import torch.nn.functional as F # noqa: N812
import torch
import torchvision.transforms.functional as F # type: ignore # noqa: N812
from tqdm import tqdm # type: ignore
@@ -224,7 +223,8 @@ def convert_lerobot_dataset_to_cropper_lerobot_dataset(
cropped = F.crop(value, top, left, height, width)
value = F.resize(cropped, resize_size)
value = value.clamp(0, 1)
if key.startswith("complementary_info") and isinstance(value, torch.Tensor) and value.dim() == 0:
value = value.unsqueeze(0)
new_frame[key] = value
new_dataset.add_frame(new_frame, task=task)
@@ -265,8 +265,7 @@ if __name__ == "__main__":
)
parser.add_argument(
"--push-to-hub",
type=bool,
default=False,
action="store_true",
help="Whether to push the new dataset to the hub.",
)
parser.add_argument(

View File

@@ -254,20 +254,19 @@ class RobotEnv(gym.Env):
self._joint_names = [f"{key}.pos" for key in self.robot.bus.motors]
self._image_keys = self.robot.cameras.keys()
# Read initial joint positions using the bus
self.current_joint_positions = self._get_observation()["agent_pos"]
self.current_observation = None
self.use_gripper = use_gripper
self._setup_spaces()
def _get_observation(self) -> np.ndarray:
def _get_observation(self) -> dict[str, np.ndarray]:
"""Helper to convert a dictionary from bus.sync_read to an ordered numpy array."""
obs_dict = self.robot.get_observation()
joint_positions = np.array([obs_dict[name] for name in self._joint_names], dtype=np.float32)
joint_positions = np.array([obs_dict[name] for name in self._joint_names])
images = {key: obs_dict[key] for key in self._image_keys}
return {"agent_pos": joint_positions, "pixels": images}
self.current_observation = {"agent_pos": joint_positions, "pixels": images}
def _setup_spaces(self):
"""
@@ -281,24 +280,24 @@ class RobotEnv(gym.Env):
- The action space is defined as a Box space representing joint position commands. It is defined as relative (delta)
or absolute, based on the configuration.
"""
example_obs = self._get_observation()
self._get_observation()
observation_spaces = {}
# Define observation spaces for images and other states.
if "pixels" in example_obs:
prefix = "observation.images" if len(example_obs["pixels"]) > 1 else "observation.image"
if "pixels" in self.current_observation:
prefix = "observation.images"
observation_spaces = {
f"{prefix}.{key}": gym.spaces.Box(
low=0, high=255, shape=example_obs["pixels"][key].shape, dtype=np.uint8
low=0, high=255, shape=self.current_observation["pixels"][key].shape, dtype=np.uint8
)
for key in example_obs["pixels"]
for key in self.current_observation["pixels"]
}
observation_spaces["observation.state"] = gym.spaces.Box(
low=0,
high=10,
shape=example_obs["agent_pos"].shape,
shape=self.current_observation["agent_pos"].shape,
dtype=np.float32,
)
@@ -340,14 +339,12 @@ class RobotEnv(gym.Env):
self.robot.reset()
# Capture the initial observation.
observation = self._get_observation()
# Reset episode tracking variables.
self.current_step = 0
self.episode_data = None
return observation, {"is_intervention": False}
self.current_observation = None
self._get_observation()
return self.current_observation, {"is_intervention": False}
def step(self, action) -> tuple[dict[str, np.ndarray], float, bool, bool, dict[str, Any]]:
"""
@@ -367,8 +364,6 @@ class RobotEnv(gym.Env):
- truncated (bool): True if the episode was truncated (e.g., time constraints).
- info (dict): Additional debugging information including intervention status.
"""
self.current_joint_positions = self._get_observation()["agent_pos"]
action_dict = {"delta_x": action[0], "delta_y": action[1], "delta_z": action[2]}
# 1.0 action corresponds to no-op action
@@ -376,6 +371,8 @@ class RobotEnv(gym.Env):
self.robot.send_action(action_dict)
self._get_observation()
if self.display_cameras:
self.render()
@@ -386,7 +383,7 @@ class RobotEnv(gym.Env):
truncated = False
return (
self._get_observation(),
self.current_observation,
reward,
terminated,
truncated,
@@ -399,11 +396,10 @@ class RobotEnv(gym.Env):
"""
import cv2
observation = self._get_observation()
image_keys = [key for key in observation if "image" in key]
image_keys = [key for key in self.current_observation if "image" in key]
for key in image_keys:
cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR))
cv2.imshow(key, cv2.cvtColor(self.current_observation[key].numpy(), cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
def close(self):
@@ -520,7 +516,10 @@ class AddCurrentToObservation(gym.ObservationWrapper):
Returns:
The modified observation with current values.
"""
present_current_observation = self.unwrapped._get_observation()["agent_pos"]
present_current_dict = self.env.unwrapped.robot.bus.sync_read("Present_Current")
present_current_observation = np.array(
[present_current_dict[name] for name in self.env.unwrapped.robot.bus.motors]
)
observation["agent_pos"] = np.concatenate(
[observation["agent_pos"], present_current_observation], axis=-1
)
@@ -1090,13 +1089,10 @@ class EEObservationWrapper(gym.ObservationWrapper):
dtype=np.float32,
)
# Initialize kinematics instance for the appropriate robot type
robot_type = getattr(env.unwrapped.robot.config, "robot_type", "so101")
if "so100" in robot_type or "so101" in robot_type:
# Note to be compatible with the rest of the codebase,
# we are using the new calibration method for so101 and so100
robot_type = "so_new_calibration"
self.kinematics = RobotKinematics(robot_type)
self.kinematics = RobotKinematics(
urdf_path=env.unwrapped.robot.config.urdf_path,
target_frame_name=env.unwrapped.robot.config.target_frame_name,
)
def observation(self, observation):
"""
@@ -1108,9 +1104,9 @@ class EEObservationWrapper(gym.ObservationWrapper):
Returns:
Enhanced observation with end-effector pose information.
"""
current_joint_pos = self.unwrapped._get_observation()["agent_pos"]
current_joint_pos = self.unwrapped.current_observation["agent_pos"]
current_ee_pos = self.kinematics.forward_kinematics(current_joint_pos, frame="gripper_tip")[:3, 3]
current_ee_pos = self.kinematics.forward_kinematics(current_joint_pos)[:3, 3]
observation["agent_pos"] = np.concatenate([observation["agent_pos"], current_ee_pos], -1)
return observation
@@ -1157,12 +1153,10 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.event_lock = Lock() # Thread-safe access to events
# Initialize robot control
robot_type = getattr(env.unwrapped.robot.config, "robot_type", "so101")
if "so100" in robot_type or "so101" in robot_type:
# Note to be compatible with the rest of the codebase,
# we are using the new calibration method for so101 and so100
robot_type = "so_new_calibration"
self.kinematics = RobotKinematics(robot_type)
self.kinematics = RobotKinematics(
urdf_path=env.unwrapped.robot.config.urdf_path,
target_frame_name=env.unwrapped.robot.config.target_frame_name,
)
self.leader_torque_enabled = True
self.prev_leader_gripper = None
@@ -1260,14 +1254,14 @@ class BaseLeaderControlWrapper(gym.Wrapper):
leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position")
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict], dtype=np.float32)
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict], dtype=np.float32)
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict])
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict])
self.leader_tracking_error_queue.append(np.linalg.norm(follower_pos[:-1] - leader_pos[:-1]))
# [:3, 3] Last column of the transformation matrix corresponds to the xyz translation
leader_ee = self.kinematics.forward_kinematics(leader_pos, frame="gripper_tip")[:3, 3]
follower_ee = self.kinematics.forward_kinematics(follower_pos, frame="gripper_tip")[:3, 3]
leader_ee = self.kinematics.forward_kinematics(leader_pos)[:3, 3]
follower_ee = self.kinematics.forward_kinematics(follower_pos)[:3, 3]
action = np.clip(leader_ee - follower_ee, -self.end_effector_step_sizes, self.end_effector_step_sizes)
# Normalize the action to the range [-1, 1]
@@ -1341,6 +1335,9 @@ class BaseLeaderControlWrapper(gym.Wrapper):
# NOTE:
obs, reward, terminated, truncated, info = self.env.step(action)
if isinstance(action, np.ndarray):
action = torch.from_numpy(action)
# Add intervention info
info["is_intervention"] = is_intervention
info["action_intervention"] = action
@@ -1877,7 +1874,6 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
if cfg.robot is None:
raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.")
robot = make_robot_from_config(cfg.robot)
teleop_device = make_teleoperator_from_config(cfg.teleop)
teleop_device.connect()

View File

@@ -295,8 +295,8 @@ class GamepadController(InputController):
try:
# Read joystick axes
# Left stick X and Y (typically axes 0 and 1)
x_input = self.joystick.get_axis(0) # Left/Right
y_input = self.joystick.get_axis(1) # Up/Down (often inverted)
y_input = self.joystick.get_axis(0) # Left/Right
x_input = self.joystick.get_axis(1) # Up/Down (often inverted)
# Right stick Y (typically axis 3 or 4)
z_input = self.joystick.get_axis(3) # Up/Down for Z
@@ -307,8 +307,8 @@ class GamepadController(InputController):
z_input = 0 if abs(z_input) < self.deadzone else z_input
# Calculate deltas (note: may need to invert axes depending on controller)
delta_x = -y_input * self.y_step_size # Forward/backward
delta_y = -x_input * self.x_step_size # Left/right
delta_x = -x_input * self.x_step_size # Forward/backward
delta_y = -y_input * self.y_step_size # Left/right
delta_z = -z_input * self.z_step_size # Up/down
return delta_x, delta_y, delta_z
@@ -424,14 +424,14 @@ class GamepadControllerHID(InputController):
# These offsets are for the Logitech RumblePad 2
if data and len(data) >= 8:
# Normalize joystick values from 0-255 to -1.0-1.0
self.left_x = (data[1] - 128) / 128.0
self.left_y = (data[2] - 128) / 128.0
self.left_y = (data[1] - 128) / 128.0
self.left_x = (data[2] - 128) / 128.0
self.right_x = (data[3] - 128) / 128.0
self.right_y = (data[4] - 128) / 128.0
# Apply deadzone
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x
self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y
@@ -465,8 +465,8 @@ class GamepadControllerHID(InputController):
def get_deltas(self):
"""Get the current movement deltas from gamepad state."""
# Calculate deltas - invert as needed based on controller orientation
delta_x = -self.left_y * self.x_step_size # Forward/backward
delta_y = -self.left_x * self.y_step_size # Left/right
delta_x = -self.left_x * self.x_step_size # Forward/backward
delta_y = -self.left_y * self.y_step_size # Left/right
delta_z = -self.right_y * self.z_step_size # Up/down
return delta_x, delta_y, delta_z

View File

@@ -196,17 +196,18 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
delta_x = 0.0
delta_y = 0.0
delta_z = 0.0
gripper_action = 1.0
# Generate action based on current key states
for key, val in self.current_pressed.items():
if key == keyboard.Key.up:
delta_x = int(val)
elif key == keyboard.Key.down:
delta_x = -int(val)
elif key == keyboard.Key.left:
delta_y = int(val)
elif key == keyboard.Key.right:
delta_y = -int(val)
elif key == keyboard.Key.down:
delta_y = int(val)
elif key == keyboard.Key.left:
delta_x = int(val)
elif key == keyboard.Key.right:
delta_x = -int(val)
elif key == keyboard.Key.shift:
delta_z = -int(val)
elif key == keyboard.Key.shift_r:
@@ -230,7 +231,6 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
"delta_z": delta_z,
}
gripper_action = 1 # default gripper action is to stay
if self.config.use_gripper:
action_dict["gripper"] = gripper_action