From 596e9050bd396e8cc8abba1e4d7d8cb6f3b4cf49 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Wed, 2 Jul 2025 15:20:04 +0200 Subject: [PATCH] Refactor kinematics and switch to using placo (#1322) Co-authored-by: Caroline Pascal Co-authored-by: Adil Zouitine Co-authored-by: leo-berte --- docs/source/hilserl.mdx | 1 + pyproject.toml | 4 +- src/lerobot/model/kinematics.py | 539 +++--------------- .../so100_follower/config_so100_follower.py | 8 + .../so100_follower_end_effector.py | 19 +- src/lerobot/scripts/find_joint_limits.py | 9 +- src/lerobot/scripts/rl/crop_dataset_roi.py | 9 +- src/lerobot/scripts/rl/gym_manipulator.py | 82 ++- .../teleoperators/gamepad/gamepad_utils.py | 18 +- .../teleoperators/keyboard/teleop_keyboard.py | 14 +- 10 files changed, 181 insertions(+), 522 deletions(-) diff --git a/docs/source/hilserl.mdx b/docs/source/hilserl.mdx index 79b49c914..b3ab40c89 100644 --- a/docs/source/hilserl.mdx +++ b/docs/source/hilserl.mdx @@ -24,6 +24,7 @@ This guide provides step-by-step instructions for training a robot policy using - A gamepad (recommended) or keyboard to control the robot - A Nvidia GPU - A real robot with a follower and leader arm (optional if you use the keyboard or the gamepad) +- A URDF file for the robot for the kinematics package (check `lerobot/common/model/kinematics.py`) ## What kind of tasks can I train? diff --git a/pyproject.toml b/pyproject.toml index 5a1d7a1d1..9fc84d903 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,7 +68,6 @@ dependencies = [ "pyserial>=3.5", "pyzmq>=26.2.1", "rerun-sdk>=0.21.0", - "scipy>=1.14.0", "termcolor>=2.4.0", "torch>=2.2.1", "torchcodec>=0.2.1; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", @@ -87,6 +86,7 @@ dora = [ dynamixel = ["dynamixel-sdk>=3.7.31"] feetech = ["feetech-servo-sdk>=1.0.0"] gamepad = ["pygame>=2.5.1", "hidapi>=0.14.0"] +kinematics = ["placo>=0.9.6"] intelrealsense = [ "pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'", "pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'", @@ -100,7 +100,7 @@ stretch = [ "pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'" ] test = ["pytest>=8.1.0", "pytest-timeout>=2.4.0", "pytest-cov>=5.0.0", "pyserial>=3.5", "mock-serial>=0.0.1 ; sys_platform != 'win32'"] -hilserl = ["transformers>=4.50.3", "gym-hil>=0.1.8", "protobuf>=5.29.3", "grpcio==1.71.0"] +hilserl = ["transformers>=4.50.3", "gym-hil>=0.1.9", "protobuf>=5.29.3", "grpcio==1.71.0", "placo>=0.9.6"] umi = ["imagecodecs>=2024.1.1"] video_benchmark = ["scikit-image>=0.23.2", "pandas>=2.2.2"] xarm = ["gym-xarm>=0.1.1 ; python_version < '4.0'"] diff --git a/src/lerobot/model/kinematics.py b/src/lerobot/model/kinematics.py index 367b609e1..f059b9790 100644 --- a/src/lerobot/model/kinematics.py +++ b/src/lerobot/model/kinematics.py @@ -12,472 +12,117 @@ # See the License for the specific language governing permissions and # limitations under the License. - import numpy as np -from numpy.typing import NDArray -from scipy.spatial.transform import Rotation - - -def skew_symmetric(w: NDArray[np.float32]) -> NDArray[np.float32]: - """Creates the skew-symmetric matrix from a 3D vector.""" - return np.array([[0, -w[2], w[1]], [w[2], 0, -w[0]], [-w[1], w[0], 0]]) - - -def rodrigues_rotation(w: NDArray[np.float32], theta: float) -> NDArray[np.float32]: - """Computes the rotation matrix using Rodrigues' formula.""" - w_hat = skew_symmetric(w) - return np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat - - -def screw_axis_to_transform(s: NDArray[np.float32], theta: float) -> NDArray[np.float32]: - """Converts a screw axis to a 4x4 transformation matrix.""" - screw_axis_rot = s[:3] - screw_axis_trans = s[3:] - - # Pure translation - if np.allclose(screw_axis_rot, 0) and np.linalg.norm(screw_axis_trans) == 1: - transform = np.eye(4) - transform[:3, 3] = screw_axis_trans * theta - - # Rotation (and potentially translation) - elif np.linalg.norm(screw_axis_rot) == 1: - w_hat = skew_symmetric(screw_axis_rot) - rot_mat = np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat - t = ( - np.eye(3) * theta + (1 - np.cos(theta)) * w_hat + (theta - np.sin(theta)) * w_hat @ w_hat - ) @ screw_axis_trans - transform = np.eye(4) - transform[:3, :3] = rot_mat - transform[:3, 3] = t - else: - raise ValueError("Invalid screw axis parameters") - return transform - - -def pose_difference_se3(pose1: NDArray[np.float32], pose2: NDArray[np.float32]) -> NDArray[np.float32]: - """ - Calculates the SE(3) difference between two 4x4 homogeneous transformation matrices. - SE(3) (Special Euclidean Group) represents rigid body transformations in 3D space, - combining rotation (SO(3)) and translation. - - Each 4x4 matrix has the following structure: - [R11 R12 R13 tx] - [R21 R22 R23 ty] - [R31 R32 R33 tz] - [ 0 0 0 1] - - where R is the 3x3 rotation matrix and [tx,ty,tz] is the translation vector. - - Args: - pose1: A 4x4 numpy array representing the first pose. - pose2: A 4x4 numpy array representing the second pose. - - Returns: - A 6D numpy array concatenating translation and rotation differences. - First 3 elements are the translational difference (position). - Last 3 elements are the rotational difference in axis-angle representation. - """ - rot1 = pose1[:3, :3] - rot2 = pose2[:3, :3] - - translation_diff = pose1[:3, 3] - pose2[:3, 3] - - # Calculate rotational difference using scipy's Rotation library - rot_diff = Rotation.from_matrix(rot1 @ rot2.T) - rotation_diff = rot_diff.as_rotvec() # Axis-angle representation - - return np.concatenate([translation_diff, rotation_diff]) - - -def se3_error(target_pose: NDArray[np.float32], current_pose: NDArray[np.float32]) -> NDArray[np.float32]: - pos_error = target_pose[:3, 3] - current_pose[:3, 3] - - rot_target = target_pose[:3, :3] - rot_current = current_pose[:3, :3] - rot_error_mat = rot_target @ rot_current.T - rot_error = Rotation.from_matrix(rot_error_mat).as_rotvec() - - return np.concatenate([pos_error, rot_error]) class RobotKinematics: - """Robot kinematics class supporting multiple robot models.""" + """Robot kinematics using placo library for forward and inverse kinematics.""" - # Robot measurements dictionary - ROBOT_MEASUREMENTS = { - "koch": { - "gripper": [0.239, -0.001, 0.024], - "wrist": [0.209, 0, 0.024], - "forearm": [0.108, 0, 0.02], - "humerus": [0, 0, 0.036], - "shoulder": [0, 0, 0], - "base": [0, 0, 0.02], - }, - "moss": { - "gripper": [0.246, 0.013, 0.111], - "wrist": [0.245, 0.002, 0.064], - "forearm": [0.122, 0, 0.064], - "humerus": [0.001, 0.001, 0.063], - "shoulder": [0, 0, 0], - "base": [0, 0, 0.02], - }, - "so_old_calibration": { - "gripper": [0.320, 0, 0.050], - "wrist": [0.278, 0, 0.050], - "forearm": [0.143, 0, 0.044], - "humerus": [0.031, 0, 0.072], - "shoulder": [0, 0, 0], - "base": [0, 0, 0.02], - }, - "so_new_calibration": { - "gripper": [0.33, 0.0, 0.285], - "wrist": [0.30, 0.0, 0.267], - "forearm": [0.25, 0.0, 0.266], - "humerus": [0.06, 0.0, 0.264], - "shoulder": [0.0, 0.0, 0.238], - "base": [0.0, 0.0, 0.12], - }, - } - - def __init__(self, robot_type: str = "so100"): - """Initialize kinematics for the specified robot type. - - Args: - robot_type: String specifying the robot model ("koch", "so100", or "moss") - """ - if robot_type not in self.ROBOT_MEASUREMENTS: - raise ValueError( - f"Unknown robot type: {robot_type}. Available types: {list(self.ROBOT_MEASUREMENTS.keys())}" - ) - - self.robot_type = robot_type - self.measurements = self.ROBOT_MEASUREMENTS[robot_type] - - # Initialize all transformation matrices and screw axes - self._setup_transforms() - - def _create_translation_matrix( - self, x: float = 0.0, y: float = 0.0, z: float = 0.0 - ) -> NDArray[np.float32]: - """Create a 4x4 translation matrix.""" - return np.array([[1, 0, 0, x], [0, 1, 0, y], [0, 0, 1, z], [0, 0, 0, 1]]) - - def _setup_transforms(self): - """Setup all transformation matrices and screw axes for the robot.""" - # Set up rotation matrices (constant across robot types) - - # Gripper orientation - self.gripper_X0 = np.array( - [ - [1, 0, 0, 0], - [0, 0, 1, 0], - [0, -1, 0, 0], - [0, 0, 0, 1], - ], - dtype=np.float32, - ) - - # Wrist orientation - self.wrist_X0 = np.array( - [ - [0, -1, 0, 0], - [1, 0, 0, 0], - [0, 0, 1, 0], - [0, 0, 0, 1], - ], - dtype=np.float32, - ) - - # Base orientation - self.base_X0 = np.array( - [ - [0, 0, 1, 0], - [1, 0, 0, 0], - [0, 1, 0, 0], - [0, 0, 0, 1], - ], - dtype=np.float32, - ) - - # Gripper - # Screw axis of gripper frame wrt base frame - self.S_BG = np.array( - [ - 1, - 0, - 0, - 0, - self.measurements["gripper"][2], - -self.measurements["gripper"][1], - ], - dtype=np.float32, - ) - - # Gripper origin to centroid transform - self.X_GoGc = self._create_translation_matrix(x=0.07) - - # Gripper origin to tip transform - self.X_GoGt = self._create_translation_matrix(x=0.12) - - # 0-position gripper frame pose wrt base - self.X_BoGo = self._create_translation_matrix( - x=self.measurements["gripper"][0], - y=self.measurements["gripper"][1], - z=self.measurements["gripper"][2], - ) - - # Wrist - # Screw axis of wrist frame wrt base frame - self.S_BR = np.array( - [0, 1, 0, -self.measurements["wrist"][2], 0, self.measurements["wrist"][0]], dtype=np.float32 - ) - - # 0-position origin to centroid transform - self.X_RoRc = self._create_translation_matrix(x=0.0035, y=-0.002) - - # 0-position wrist frame pose wrt base - self.X_BR = self._create_translation_matrix( - x=self.measurements["wrist"][0], - y=self.measurements["wrist"][1], - z=self.measurements["wrist"][2], - ) - - # Forearm - # Screw axis of forearm frame wrt base frame - self.S_BF = np.array( - [ - 0, - 1, - 0, - -self.measurements["forearm"][2], - 0, - self.measurements["forearm"][0], - ], - dtype=np.float32, - ) - - # Forearm origin + centroid transform - self.X_ForearmFc = self._create_translation_matrix(x=0.036) - - # 0-position forearm frame pose wrt base - self.X_BF = self._create_translation_matrix( - x=self.measurements["forearm"][0], - y=self.measurements["forearm"][1], - z=self.measurements["forearm"][2], - ) - - # Humerus - # Screw axis of humerus frame wrt base frame - self.S_BH = np.array( - [ - 0, - -1, - 0, - self.measurements["humerus"][2], - 0, - -self.measurements["humerus"][0], - ], - dtype=np.float32, - ) - - # Humerus origin to centroid transform - self.X_HoHc = self._create_translation_matrix(x=0.0475) - - # 0-position humerus frame pose wrt base - self.X_BH = self._create_translation_matrix( - x=self.measurements["humerus"][0], - y=self.measurements["humerus"][1], - z=self.measurements["humerus"][2], - ) - - # Shoulder - # Screw axis of shoulder frame wrt Base frame - self.S_BS = np.array([0, 0, -1, 0, 0, 0], dtype=np.float32) - - # Shoulder origin to centroid transform - self.X_SoSc = self._create_translation_matrix(x=-0.017, z=0.0235) - - # 0-position shoulder frame pose wrt base - self.X_BS = self._create_translation_matrix( - x=self.measurements["shoulder"][0], - y=self.measurements["shoulder"][1], - z=self.measurements["shoulder"][2], - ) - - # Base - # Base origin to centroid transform - self.X_BoBc = self._create_translation_matrix(y=0.015) - - # World to base transform - self.X_WoBo = self._create_translation_matrix( - x=self.measurements["base"][0], - y=self.measurements["base"][1], - z=self.measurements["base"][2], - ) - - # Pre-compute gripper post-multiplication matrix - self._fk_gripper_post = self.X_GoGc @ self.X_BoGo @ self.gripper_X0 - - def forward_kinematics( + def __init__( self, - robot_pos_deg: NDArray[np.float32], - frame: str = "gripper_tip", - ) -> NDArray[np.float32]: - """Generic forward kinematics. - - Args: - robot_pos_deg: Joint positions in degrees. Can be ``None`` when - computing the *base* frame as it does not depend on joint - angles. - frame: Target frame. One of - ``{"base", "shoulder", "humerus", "forearm", "wrist", "gripper", "gripper_tip"}``. - - Returns - ------- - NDArray[np.float32] - 4×4 homogeneous transformation matrix of the requested frame - expressed in the world coordinate system. + urdf_path: str, + target_frame_name: str = "gripper_frame_link", + joint_names: list[str] = None, + ): """ - frame = frame.lower() - if frame not in { - "base", - "shoulder", - "humerus", - "forearm", - "wrist", - "gripper", - "gripper_tip", - }: - raise ValueError( - f"Unknown frame '{frame}'. Valid options are base, shoulder, humerus, forearm, wrist, gripper, gripper_tip." - ) - - # Base frame does not rely on joint angles. - if frame == "base": - return self.X_WoBo @ self.X_BoBc @ self.base_X0 - - robot_pos_rad = robot_pos_deg / 180 * np.pi - - # Extract joint angles (note the sign convention for shoulder lift). - theta_shoulder_pan = robot_pos_rad[0] - theta_shoulder_lift = -robot_pos_rad[1] - theta_elbow_flex = robot_pos_rad[2] - theta_wrist_flex = robot_pos_rad[3] - theta_wrist_roll = robot_pos_rad[4] - - # Start with the world-to-base transform; incrementally add successive links. - transformation_matrix = self.X_WoBo @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan) - if frame == "shoulder": - return transformation_matrix @ self.X_SoSc @ self.X_BS - - transformation_matrix = transformation_matrix @ screw_axis_to_transform( - self.S_BH, theta_shoulder_lift - ) - if frame == "humerus": - return transformation_matrix @ self.X_HoHc @ self.X_BH - - transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BF, theta_elbow_flex) - if frame == "forearm": - return transformation_matrix @ self.X_ForearmFc @ self.X_BF - - transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BR, theta_wrist_flex) - if frame == "wrist": - return transformation_matrix @ self.X_RoRc @ self.X_BR @ self.wrist_X0 - - transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BG, theta_wrist_roll) - if frame == "gripper": - return transformation_matrix @ self._fk_gripper_post - else: # frame == "gripper_tip" - return transformation_matrix @ self.X_GoGt @ self.X_BoGo @ self.gripper_X0 - - def compute_jacobian( - self, robot_pos_deg: NDArray[np.float32], frame: str = "gripper_tip" - ) -> NDArray[np.float32]: - """Finite differences to compute the Jacobian. - J(i, j) represents how the ith component of the end-effector's velocity changes wrt a small change - in the jth joint's velocity. + Initialize placo-based kinematics solver. Args: - robot_pos_deg: Current joint positions in degrees - fk_func: Forward kinematics function to use (defaults to fk_gripper) + urdf_path: Path to the robot URDF file + target_frame_name: Name of the end-effector frame in the URDF + joint_names: List of joint names to use for the kinematics solver + """ + try: + import placo + except ImportError as e: + raise ImportError( + "placo is required for RobotKinematics. " + "Please install the optional dependencies of `kinematics` in the package." + ) from e + + self.robot = placo.RobotWrapper(urdf_path) + self.solver = placo.KinematicsSolver(self.robot) + self.solver.mask_fbase(True) # Fix the base + + self.target_frame_name = target_frame_name + + # Set joint names + self.joint_names = list(self.robot.joint_names()) if joint_names is None else joint_names + + # Initialize frame task for IK + self.tip_frame = self.solver.add_frame_task(self.target_frame_name, np.eye(4)) + + def forward_kinematics(self, joint_pos_deg): + """ + Compute forward kinematics for given joint configuration given the target frame name in the constructor. + + Args: + joint_pos_deg: Joint positions in degrees (numpy array) + + Returns: + 4x4 transformation matrix of the end-effector pose """ - eps = 1e-8 - jac = np.zeros(shape=(6, 5)) - delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64) - for el_ix in range(len(robot_pos_deg[:-1])): - delta *= 0 - delta[el_ix] = eps / 2 - sdot = ( - pose_difference_se3( - self.forward_kinematics(robot_pos_deg[:-1] + delta, frame), - self.forward_kinematics(robot_pos_deg[:-1] - delta, frame), - ) - / eps - ) - jac[:, el_ix] = sdot - return jac + # Convert degrees to radians + joint_pos_rad = np.deg2rad(joint_pos_deg[: len(self.joint_names)]) - def compute_positional_jacobian( - self, robot_pos_deg: NDArray[np.float32], frame: str = "gripper_tip" - ) -> NDArray[np.float32]: - """Finite differences to compute the positional Jacobian. - J(i, j) represents how the ith component of the end-effector's position changes wrt a small change - in the jth joint's velocity. + # Update joint positions in placo robot + for i, joint_name in enumerate(self.joint_names): + self.robot.set_joint(joint_name, joint_pos_rad[i]) - Args: - robot_pos_deg: Current joint positions in degrees - fk_func: Forward kinematics function to use (defaults to fk_gripper) + # Update kinematics + self.robot.update_kinematics() + + # Get the transformation matrix + return self.robot.get_T_world_frame(self.target_frame_name) + + def inverse_kinematics( + self, current_joint_pos, desired_ee_pose, position_weight=1.0, orientation_weight=0.01 + ): """ - eps = 1e-8 - jac = np.zeros(shape=(3, 5)) - delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64) - for el_ix in range(len(robot_pos_deg[:-1])): - delta *= 0 - delta[el_ix] = eps / 2 - sdot = ( - self.forward_kinematics(robot_pos_deg[:-1] + delta, frame)[:3, 3] - - self.forward_kinematics(robot_pos_deg[:-1] - delta, frame)[:3, 3] - ) / eps - jac[:, el_ix] = sdot - return jac - - def ik( - self, - current_joint_pos: NDArray[np.float32], - desired_ee_pose: NDArray[np.float32], - position_only: bool = True, - frame: str = "gripper_tip", - max_iterations: int = 5, - learning_rate: float = 1, - ) -> NDArray[np.float32]: - """Inverse kinematics using gradient descent. + Compute inverse kinematics using placo solver. Args: - current_joint_state: Initial joint positions in degrees + current_joint_pos: Current joint positions in degrees (used as initial guess) desired_ee_pose: Target end-effector pose as a 4x4 transformation matrix - position_only: If True, only match end-effector position, not orientation - frame: Target frame. One of - ``{"base", "shoulder", "humerus", "forearm", "wrist", "gripper", "gripper_tip"}``. - max_iterations: Maximum number of iterations to run - learning_rate: Learning rate for gradient descent + position_weight: Weight for position constraint in IK + orientation_weight: Weight for orientation constraint in IK, set to 0.0 to only constrain position Returns: Joint positions in degrees that achieve the desired end-effector pose """ - # Do gradient descent. - current_joint_state = current_joint_pos.copy() - for _ in range(max_iterations): - current_ee_pose = self.forward_kinematics(current_joint_state, frame) - if not position_only: - error = se3_error(desired_ee_pose, current_ee_pose) - jac = self.compute_jacobian(current_joint_state, frame) - else: - error = desired_ee_pose[:3, 3] - current_ee_pose[:3, 3] - jac = self.compute_positional_jacobian(current_joint_state, frame) - delta_angles = np.linalg.pinv(jac) @ error - current_joint_state[:-1] += learning_rate * delta_angles - if np.linalg.norm(error) < 5e-3: - return current_joint_state - return current_joint_state + # Convert current joint positions to radians for initial guess + current_joint_rad = np.deg2rad(current_joint_pos[: len(self.joint_names)]) + + # Set current joint positions as initial guess + for i, joint_name in enumerate(self.joint_names): + self.robot.set_joint(joint_name, current_joint_rad[i]) + + # Update the target pose for the frame task + self.tip_frame.T_world_frame = desired_ee_pose + + # Configure the task based on position_only flag + self.tip_frame.configure(self.target_frame_name, "soft", position_weight, orientation_weight) + + # Solve IK + self.solver.solve(True) + self.robot.update_kinematics() + + # Extract joint positions + joint_pos_rad = [] + for joint_name in self.joint_names: + joint = self.robot.get_joint(joint_name) + joint_pos_rad.append(joint) + + # Convert back to degrees + joint_pos_deg = np.rad2deg(joint_pos_rad) + + # Preserve gripper position if present in current_joint_pos + if len(current_joint_pos) > len(self.joint_names): + result = np.zeros_like(current_joint_pos) + result[: len(self.joint_names)] = joint_pos_deg + result[len(self.joint_names) :] = current_joint_pos[len(self.joint_names) :] + return result + else: + return joint_pos_deg diff --git a/src/lerobot/robots/so100_follower/config_so100_follower.py b/src/lerobot/robots/so100_follower/config_so100_follower.py index 249b44960..7cd23d340 100644 --- a/src/lerobot/robots/so100_follower/config_so100_follower.py +++ b/src/lerobot/robots/so100_follower/config_so100_follower.py @@ -44,6 +44,14 @@ class SO100FollowerConfig(RobotConfig): class SO100FollowerEndEffectorConfig(SO100FollowerConfig): """Configuration for the SO100FollowerEndEffector robot.""" + # Path to URDF file for kinematics + # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: + # https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf + urdf_path: str | None = None + + # End-effector frame name in the URDF + target_frame_name: str = "gripper_frame_link" + # Default bounds for the end-effector position (in meters) end_effector_bounds: dict[str, list[float]] = field( default_factory=lambda: { diff --git a/src/lerobot/robots/so100_follower/so100_follower_end_effector.py b/src/lerobot/robots/so100_follower/so100_follower_end_effector.py index d01b60e93..5fe2993cb 100644 --- a/src/lerobot/robots/so100_follower/so100_follower_end_effector.py +++ b/src/lerobot/robots/so100_follower/so100_follower_end_effector.py @@ -30,7 +30,6 @@ from . import SO100Follower from .config_so100_follower import SO100FollowerEndEffectorConfig logger = logging.getLogger(__name__) -EE_FRAME = "gripper_tip" class SO100FollowerEndEffector(SO100Follower): @@ -64,7 +63,16 @@ class SO100FollowerEndEffector(SO100Follower): self.config = config # Initialize the kinematics module for the so100 robot - self.kinematics = RobotKinematics(robot_type="so_new_calibration") + if self.config.urdf_path is None: + raise ValueError( + "urdf_path must be provided in the configuration for end-effector control. " + "Please set urdf_path in your SO100FollowerEndEffectorConfig." + ) + + self.kinematics = RobotKinematics( + urdf_path=self.config.urdf_path, + target_frame_name=self.config.target_frame_name, + ) # Store the bounds for end-effector position self.end_effector_bounds = self.config.end_effector_bounds @@ -126,7 +134,7 @@ class SO100FollowerEndEffector(SO100Follower): # Calculate current end-effector position using forward kinematics if self.current_ee_pos is None: - self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos, frame=EE_FRAME) + self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos) # Set desired end-effector position by adding delta desired_ee_pos = np.eye(4) @@ -142,11 +150,10 @@ class SO100FollowerEndEffector(SO100Follower): ) # Compute inverse kinematics to get joint positions - target_joint_values_in_degrees = self.kinematics.ik( - self.current_joint_pos, desired_ee_pos, position_only=True, frame=EE_FRAME + target_joint_values_in_degrees = self.kinematics.inverse_kinematics( + self.current_joint_pos, desired_ee_pos ) - target_joint_values_in_degrees = np.clip(target_joint_values_in_degrees, -180.0, 180.0) # Create joint space action dictionary joint_action = { f"{key}.pos": target_joint_values_in_degrees[i] for i, key in enumerate(self.bus.motors.keys()) diff --git a/src/lerobot/scripts/find_joint_limits.py b/src/lerobot/scripts/find_joint_limits.py index 346edf8ac..c61806d23 100644 --- a/src/lerobot/scripts/find_joint_limits.py +++ b/src/lerobot/scripts/find_joint_limits.py @@ -36,6 +36,7 @@ from dataclasses import dataclass import draccus import numpy as np +from lerobot.common.utils.robot_utils import busy_wait from lerobot.model.kinematics import RobotKinematics from lerobot.robots import ( # noqa: F401 RobotConfig, @@ -76,12 +77,12 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig): # Note to be compatible with the rest of the codebase, # we are using the new calibration method for so101 and so100 robot_type = "so_new_calibration" - kinematics = RobotKinematics(robot_type=robot_type) + kinematics = RobotKinematics(cfg.robot.urdf_path, cfg.robot.target_frame_name) # Initialize min/max values observation = robot.get_observation() joint_positions = np.array([observation[f"{key}.pos"] for key in robot.bus.motors]) - ee_pos = kinematics.forward_kinematics(joint_positions, frame="gripper_tip")[:3, 3] + ee_pos = kinematics.forward_kinematics(joint_positions)[:3, 3] max_pos = joint_positions.copy() min_pos = joint_positions.copy() @@ -94,7 +95,7 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig): observation = robot.get_observation() joint_positions = np.array([observation[f"{key}.pos"] for key in robot.bus.motors]) - ee_pos = kinematics.forward_kinematics(joint_positions, frame="gripper_tip")[:3, 3] + ee_pos = kinematics.forward_kinematics(joint_positions)[:3, 3] # Skip initial warmup period if (time.perf_counter() - start_episode_t) < 5: @@ -113,6 +114,8 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig): print(f"Min joint pos position {np.round(min_pos, 4).tolist()}") break + busy_wait(0.01) + if __name__ == "__main__": find_joint_and_ee_bounds() diff --git a/src/lerobot/scripts/rl/crop_dataset_roi.py b/src/lerobot/scripts/rl/crop_dataset_roi.py index 4cb7a3e8a..0b71b5363 100644 --- a/src/lerobot/scripts/rl/crop_dataset_roi.py +++ b/src/lerobot/scripts/rl/crop_dataset_roi.py @@ -21,8 +21,7 @@ from pathlib import Path from typing import Dict, Tuple import cv2 - -# import torch.nn.functional as F # noqa: N812 +import torch import torchvision.transforms.functional as F # type: ignore # noqa: N812 from tqdm import tqdm # type: ignore @@ -224,7 +223,8 @@ def convert_lerobot_dataset_to_cropper_lerobot_dataset( cropped = F.crop(value, top, left, height, width) value = F.resize(cropped, resize_size) value = value.clamp(0, 1) - + if key.startswith("complementary_info") and isinstance(value, torch.Tensor) and value.dim() == 0: + value = value.unsqueeze(0) new_frame[key] = value new_dataset.add_frame(new_frame, task=task) @@ -265,8 +265,7 @@ if __name__ == "__main__": ) parser.add_argument( "--push-to-hub", - type=bool, - default=False, + action="store_true", help="Whether to push the new dataset to the hub.", ) parser.add_argument( diff --git a/src/lerobot/scripts/rl/gym_manipulator.py b/src/lerobot/scripts/rl/gym_manipulator.py index 76a136084..673043b6e 100644 --- a/src/lerobot/scripts/rl/gym_manipulator.py +++ b/src/lerobot/scripts/rl/gym_manipulator.py @@ -254,20 +254,19 @@ class RobotEnv(gym.Env): self._joint_names = [f"{key}.pos" for key in self.robot.bus.motors] self._image_keys = self.robot.cameras.keys() - # Read initial joint positions using the bus - self.current_joint_positions = self._get_observation()["agent_pos"] + self.current_observation = None self.use_gripper = use_gripper self._setup_spaces() - def _get_observation(self) -> np.ndarray: + def _get_observation(self) -> dict[str, np.ndarray]: """Helper to convert a dictionary from bus.sync_read to an ordered numpy array.""" obs_dict = self.robot.get_observation() - joint_positions = np.array([obs_dict[name] for name in self._joint_names], dtype=np.float32) + joint_positions = np.array([obs_dict[name] for name in self._joint_names]) images = {key: obs_dict[key] for key in self._image_keys} - return {"agent_pos": joint_positions, "pixels": images} + self.current_observation = {"agent_pos": joint_positions, "pixels": images} def _setup_spaces(self): """ @@ -281,24 +280,24 @@ class RobotEnv(gym.Env): - The action space is defined as a Box space representing joint position commands. It is defined as relative (delta) or absolute, based on the configuration. """ - example_obs = self._get_observation() + self._get_observation() observation_spaces = {} # Define observation spaces for images and other states. - if "pixels" in example_obs: - prefix = "observation.images" if len(example_obs["pixels"]) > 1 else "observation.image" + if "pixels" in self.current_observation: + prefix = "observation.images" observation_spaces = { f"{prefix}.{key}": gym.spaces.Box( - low=0, high=255, shape=example_obs["pixels"][key].shape, dtype=np.uint8 + low=0, high=255, shape=self.current_observation["pixels"][key].shape, dtype=np.uint8 ) - for key in example_obs["pixels"] + for key in self.current_observation["pixels"] } observation_spaces["observation.state"] = gym.spaces.Box( low=0, high=10, - shape=example_obs["agent_pos"].shape, + shape=self.current_observation["agent_pos"].shape, dtype=np.float32, ) @@ -340,14 +339,12 @@ class RobotEnv(gym.Env): self.robot.reset() - # Capture the initial observation. - observation = self._get_observation() - # Reset episode tracking variables. self.current_step = 0 self.episode_data = None - - return observation, {"is_intervention": False} + self.current_observation = None + self._get_observation() + return self.current_observation, {"is_intervention": False} def step(self, action) -> tuple[dict[str, np.ndarray], float, bool, bool, dict[str, Any]]: """ @@ -367,8 +364,6 @@ class RobotEnv(gym.Env): - truncated (bool): True if the episode was truncated (e.g., time constraints). - info (dict): Additional debugging information including intervention status. """ - self.current_joint_positions = self._get_observation()["agent_pos"] - action_dict = {"delta_x": action[0], "delta_y": action[1], "delta_z": action[2]} # 1.0 action corresponds to no-op action @@ -376,6 +371,8 @@ class RobotEnv(gym.Env): self.robot.send_action(action_dict) + self._get_observation() + if self.display_cameras: self.render() @@ -386,7 +383,7 @@ class RobotEnv(gym.Env): truncated = False return ( - self._get_observation(), + self.current_observation, reward, terminated, truncated, @@ -399,11 +396,10 @@ class RobotEnv(gym.Env): """ import cv2 - observation = self._get_observation() - image_keys = [key for key in observation if "image" in key] + image_keys = [key for key in self.current_observation if "image" in key] for key in image_keys: - cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR)) + cv2.imshow(key, cv2.cvtColor(self.current_observation[key].numpy(), cv2.COLOR_RGB2BGR)) cv2.waitKey(1) def close(self): @@ -520,7 +516,10 @@ class AddCurrentToObservation(gym.ObservationWrapper): Returns: The modified observation with current values. """ - present_current_observation = self.unwrapped._get_observation()["agent_pos"] + present_current_dict = self.env.unwrapped.robot.bus.sync_read("Present_Current") + present_current_observation = np.array( + [present_current_dict[name] for name in self.env.unwrapped.robot.bus.motors] + ) observation["agent_pos"] = np.concatenate( [observation["agent_pos"], present_current_observation], axis=-1 ) @@ -1090,13 +1089,10 @@ class EEObservationWrapper(gym.ObservationWrapper): dtype=np.float32, ) - # Initialize kinematics instance for the appropriate robot type - robot_type = getattr(env.unwrapped.robot.config, "robot_type", "so101") - if "so100" in robot_type or "so101" in robot_type: - # Note to be compatible with the rest of the codebase, - # we are using the new calibration method for so101 and so100 - robot_type = "so_new_calibration" - self.kinematics = RobotKinematics(robot_type) + self.kinematics = RobotKinematics( + urdf_path=env.unwrapped.robot.config.urdf_path, + target_frame_name=env.unwrapped.robot.config.target_frame_name, + ) def observation(self, observation): """ @@ -1108,9 +1104,9 @@ class EEObservationWrapper(gym.ObservationWrapper): Returns: Enhanced observation with end-effector pose information. """ - current_joint_pos = self.unwrapped._get_observation()["agent_pos"] + current_joint_pos = self.unwrapped.current_observation["agent_pos"] - current_ee_pos = self.kinematics.forward_kinematics(current_joint_pos, frame="gripper_tip")[:3, 3] + current_ee_pos = self.kinematics.forward_kinematics(current_joint_pos)[:3, 3] observation["agent_pos"] = np.concatenate([observation["agent_pos"], current_ee_pos], -1) return observation @@ -1157,12 +1153,10 @@ class BaseLeaderControlWrapper(gym.Wrapper): self.event_lock = Lock() # Thread-safe access to events # Initialize robot control - robot_type = getattr(env.unwrapped.robot.config, "robot_type", "so101") - if "so100" in robot_type or "so101" in robot_type: - # Note to be compatible with the rest of the codebase, - # we are using the new calibration method for so101 and so100 - robot_type = "so_new_calibration" - self.kinematics = RobotKinematics(robot_type) + self.kinematics = RobotKinematics( + urdf_path=env.unwrapped.robot.config.urdf_path, + target_frame_name=env.unwrapped.robot.config.target_frame_name, + ) self.leader_torque_enabled = True self.prev_leader_gripper = None @@ -1260,14 +1254,14 @@ class BaseLeaderControlWrapper(gym.Wrapper): leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position") follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position") - leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict], dtype=np.float32) - follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict], dtype=np.float32) + leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict]) + follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict]) self.leader_tracking_error_queue.append(np.linalg.norm(follower_pos[:-1] - leader_pos[:-1])) # [:3, 3] Last column of the transformation matrix corresponds to the xyz translation - leader_ee = self.kinematics.forward_kinematics(leader_pos, frame="gripper_tip")[:3, 3] - follower_ee = self.kinematics.forward_kinematics(follower_pos, frame="gripper_tip")[:3, 3] + leader_ee = self.kinematics.forward_kinematics(leader_pos)[:3, 3] + follower_ee = self.kinematics.forward_kinematics(follower_pos)[:3, 3] action = np.clip(leader_ee - follower_ee, -self.end_effector_step_sizes, self.end_effector_step_sizes) # Normalize the action to the range [-1, 1] @@ -1341,6 +1335,9 @@ class BaseLeaderControlWrapper(gym.Wrapper): # NOTE: obs, reward, terminated, truncated, info = self.env.step(action) + if isinstance(action, np.ndarray): + action = torch.from_numpy(action) + # Add intervention info info["is_intervention"] = is_intervention info["action_intervention"] = action @@ -1877,7 +1874,6 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env: if cfg.robot is None: raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.") robot = make_robot_from_config(cfg.robot) - teleop_device = make_teleoperator_from_config(cfg.teleop) teleop_device.connect() diff --git a/src/lerobot/teleoperators/gamepad/gamepad_utils.py b/src/lerobot/teleoperators/gamepad/gamepad_utils.py index 21a293c77..9b62dc666 100644 --- a/src/lerobot/teleoperators/gamepad/gamepad_utils.py +++ b/src/lerobot/teleoperators/gamepad/gamepad_utils.py @@ -295,8 +295,8 @@ class GamepadController(InputController): try: # Read joystick axes # Left stick X and Y (typically axes 0 and 1) - x_input = self.joystick.get_axis(0) # Left/Right - y_input = self.joystick.get_axis(1) # Up/Down (often inverted) + y_input = self.joystick.get_axis(0) # Left/Right + x_input = self.joystick.get_axis(1) # Up/Down (often inverted) # Right stick Y (typically axis 3 or 4) z_input = self.joystick.get_axis(3) # Up/Down for Z @@ -307,8 +307,8 @@ class GamepadController(InputController): z_input = 0 if abs(z_input) < self.deadzone else z_input # Calculate deltas (note: may need to invert axes depending on controller) - delta_x = -y_input * self.y_step_size # Forward/backward - delta_y = -x_input * self.x_step_size # Left/right + delta_x = -x_input * self.x_step_size # Forward/backward + delta_y = -y_input * self.y_step_size # Left/right delta_z = -z_input * self.z_step_size # Up/down return delta_x, delta_y, delta_z @@ -424,14 +424,14 @@ class GamepadControllerHID(InputController): # These offsets are for the Logitech RumblePad 2 if data and len(data) >= 8: # Normalize joystick values from 0-255 to -1.0-1.0 - self.left_x = (data[1] - 128) / 128.0 - self.left_y = (data[2] - 128) / 128.0 + self.left_y = (data[1] - 128) / 128.0 + self.left_x = (data[2] - 128) / 128.0 self.right_x = (data[3] - 128) / 128.0 self.right_y = (data[4] - 128) / 128.0 # Apply deadzone - self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y + self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y @@ -465,8 +465,8 @@ class GamepadControllerHID(InputController): def get_deltas(self): """Get the current movement deltas from gamepad state.""" # Calculate deltas - invert as needed based on controller orientation - delta_x = -self.left_y * self.x_step_size # Forward/backward - delta_y = -self.left_x * self.y_step_size # Left/right + delta_x = -self.left_x * self.x_step_size # Forward/backward + delta_y = -self.left_y * self.y_step_size # Left/right delta_z = -self.right_y * self.z_step_size # Up/down return delta_x, delta_y, delta_z diff --git a/src/lerobot/teleoperators/keyboard/teleop_keyboard.py b/src/lerobot/teleoperators/keyboard/teleop_keyboard.py index 41a4293cc..d034982f1 100644 --- a/src/lerobot/teleoperators/keyboard/teleop_keyboard.py +++ b/src/lerobot/teleoperators/keyboard/teleop_keyboard.py @@ -196,17 +196,18 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop): delta_x = 0.0 delta_y = 0.0 delta_z = 0.0 + gripper_action = 1.0 # Generate action based on current key states for key, val in self.current_pressed.items(): if key == keyboard.Key.up: - delta_x = int(val) - elif key == keyboard.Key.down: - delta_x = -int(val) - elif key == keyboard.Key.left: - delta_y = int(val) - elif key == keyboard.Key.right: delta_y = -int(val) + elif key == keyboard.Key.down: + delta_y = int(val) + elif key == keyboard.Key.left: + delta_x = int(val) + elif key == keyboard.Key.right: + delta_x = -int(val) elif key == keyboard.Key.shift: delta_z = -int(val) elif key == keyboard.Key.shift_r: @@ -230,7 +231,6 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop): "delta_z": delta_z, } - gripper_action = 1 # default gripper action is to stay if self.config.use_gripper: action_dict["gripper"] = gripper_action