From ab851472964d4ba9b627c3187496a84d5bbc8af7 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Tue, 20 May 2025 21:20:28 +0200 Subject: [PATCH] Added gamepad teleoperator and so100follower end effector robots --- lerobot/common/model/__init__.py | 3 + lerobot/common/model/kinematics.py | 546 +++++++++++++ lerobot/common/motors/motors_bus.py | 11 +- .../so100_follower_end_effector/__init__.py | 2 + .../config_so100_follower_end_effector.py | 45 ++ .../so100_follower_end_effector.py | 161 ++++ lerobot/common/robots/utils.py | 11 +- .../common/teleoperators/gamepad/__init__.py | 4 + .../gamepad/configuration_gamepad.py | 32 + .../teleoperators/gamepad/gamepad_utils.py | 719 ++++++++++++++++++ .../teleoperators/gamepad/teleop_gamepad.py | 135 ++++ lerobot/common/utils/utils.py | 5 +- lerobot/teleoperate.py | 2 +- 13 files changed, 1662 insertions(+), 14 deletions(-) create mode 100644 lerobot/common/model/__init__.py create mode 100644 lerobot/common/model/kinematics.py create mode 100644 lerobot/common/robots/so100_follower_end_effector/__init__.py create mode 100644 lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py create mode 100644 lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py create mode 100644 lerobot/common/teleoperators/gamepad/__init__.py create mode 100644 lerobot/common/teleoperators/gamepad/configuration_gamepad.py create mode 100644 lerobot/common/teleoperators/gamepad/gamepad_utils.py create mode 100644 lerobot/common/teleoperators/gamepad/teleop_gamepad.py diff --git a/lerobot/common/model/__init__.py b/lerobot/common/model/__init__.py new file mode 100644 index 00000000..f4ab4b4f --- /dev/null +++ b/lerobot/common/model/__init__.py @@ -0,0 +1,3 @@ +from .kinematics import RobotKinematics + +__all__ = ["RobotKinematics"] diff --git a/lerobot/common/model/kinematics.py b/lerobot/common/model/kinematics.py new file mode 100644 index 00000000..c01ad0fe --- /dev/null +++ b/lerobot/common/model/kinematics.py @@ -0,0 +1,546 @@ +# ruff: noqa: N806, N815, N803 + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +from scipy.spatial.transform import Rotation + + +def skew_symmetric(w): + """Creates the skew-symmetric matrix from a 3D vector.""" + return np.array([[0, -w[2], w[1]], [w[2], 0, -w[0]], [-w[1], w[0], 0]]) + + +def rodrigues_rotation(w, theta): + """Computes the rotation matrix using Rodrigues' formula.""" + w_hat = skew_symmetric(w) + return np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat + + +def screw_axis_to_transform(S, theta): + """Converts a screw axis to a 4x4 transformation matrix.""" + S_w = S[:3] + S_v = S[3:] + if np.allclose(S_w, 0) and np.linalg.norm(S_v) == 1: # Pure translation + T = np.eye(4) + T[:3, 3] = S_v * theta + elif np.linalg.norm(S_w) == 1: # Rotation and translation + w_hat = skew_symmetric(S_w) + R = np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat + t = (np.eye(3) * theta + (1 - np.cos(theta)) * w_hat + (theta - np.sin(theta)) * w_hat @ w_hat) @ S_v + T = np.eye(4) + T[:3, :3] = R + T[:3, 3] = t + else: + raise ValueError("Invalid screw axis parameters") + return T + + +def pose_difference_se3(pose1, pose2): + """ + Calculates the SE(3) difference between two 4x4 homogeneous transformation matrices. + SE(3) (Special Euclidean Group) represents rigid body transformations in 3D space, combining rotation (SO(3)) and translation. + Each 4x4 matrix has the following structure, a 3x3 rotation matrix in the top-left and a 3x1 translation vector in the top-right: + + [R11 R12 R13 tx] + [R21 R22 R23 ty] + [R31 R32 R33 tz] + [ 0 0 0 1] + + where Rij is the 3x3 rotation matrix and [tx,ty,tz] is the translation vector. + + pose1 - pose2 + + Args: + pose1: A 4x4 numpy array representing the first pose. + pose2: A 4x4 numpy array representing the second pose. + + Returns: + A tuple (translation_diff, rotation_diff) where: + - translation_diff is a 3x1 numpy array representing the translational difference. + - rotation_diff is a 3x1 numpy array representing the rotational difference in axis-angle representation. + """ + + # Extract rotation matrices from poses + R1 = pose1[:3, :3] + R2 = pose2[:3, :3] + + # Calculate translational difference + translation_diff = pose1[:3, 3] - pose2[:3, 3] + + # Calculate rotational difference using scipy's Rotation library + R_diff = Rotation.from_matrix(R1 @ R2.T) + rotation_diff = R_diff.as_rotvec() # Convert to axis-angle representation + + return np.concatenate([translation_diff, rotation_diff]) + + +def se3_error(target_pose, current_pose): + pos_error = target_pose[:3, 3] - current_pose[:3, 3] + R_target = target_pose[:3, :3] + R_current = current_pose[:3, :3] + R_error = R_target @ R_current.T + rot_error = Rotation.from_matrix(R_error).as_rotvec() + return np.concatenate([pos_error, rot_error]) + + +class RobotKinematics: + """Robot kinematics class supporting multiple robot models.""" + + # Robot measurements dictionary + ROBOT_MEASUREMENTS = { + "koch": { + "gripper": [0.239, -0.001, 0.024], + "wrist": [0.209, 0, 0.024], + "forearm": [0.108, 0, 0.02], + "humerus": [0, 0, 0.036], + "shoulder": [0, 0, 0], + "base": [0, 0, 0.02], + }, + "so100": { + "gripper": [0.320, 0, 0.050], + "wrist": [0.278, 0, 0.050], + "forearm": [0.143, 0, 0.044], + "humerus": [0.031, 0, 0.072], + "shoulder": [0, 0, 0], + "base": [0, 0, 0.02], + }, + "moss": { + "gripper": [0.246, 0.013, 0.111], + "wrist": [0.245, 0.002, 0.064], + "forearm": [0.122, 0, 0.064], + "humerus": [0.001, 0.001, 0.063], + "shoulder": [0, 0, 0], + "base": [0, 0, 0.02], + }, + } + + def __init__(self, robot_type="so100"): + """Initialize kinematics for the specified robot type. + + Args: + robot_type: String specifying the robot model ("koch", "so100", or "moss") + """ + if robot_type not in self.ROBOT_MEASUREMENTS: + raise ValueError( + f"Unknown robot type: {robot_type}. Available types: {list(self.ROBOT_MEASUREMENTS.keys())}" + ) + + self.robot_type = robot_type + self.measurements = self.ROBOT_MEASUREMENTS[robot_type] + + # Initialize all transformation matrices and screw axes + self._setup_transforms() + + def _create_translation_matrix(self, x=0, y=0, z=0): + """Create a 4x4 translation matrix.""" + return np.array([[1, 0, 0, x], [0, 1, 0, y], [0, 0, 1, z], [0, 0, 0, 1]]) + + def _setup_transforms(self): + """Setup all transformation matrices and screw axes for the robot.""" + # Set up rotation matrices (constant across robot types) + + # Gripper orientation + self.gripper_X0 = np.array( + [ + [1, 0, 0, 0], + [0, 0, 1, 0], + [0, -1, 0, 0], + [0, 0, 0, 1], + ] + ) + + # Wrist orientation + self.wrist_X0 = np.array( + [ + [0, -1, 0, 0], + [1, 0, 0, 0], + [0, 0, 1, 0], + [0, 0, 0, 1], + ] + ) + + # Base orientation + self.base_X0 = np.array( + [ + [0, 0, 1, 0], + [1, 0, 0, 0], + [0, 1, 0, 0], + [0, 0, 0, 1], + ] + ) + + # Gripper + # Screw axis of gripper frame wrt base frame + self.S_BG = np.array( + [ + 1, + 0, + 0, + 0, + self.measurements["gripper"][2], + -self.measurements["gripper"][1], + ] + ) + + # Gripper origin to centroid transform + self.X_GoGc = self._create_translation_matrix(x=0.07) + + # Gripper origin to tip transform + self.X_GoGt = self._create_translation_matrix(x=0.12) + + # 0-position gripper frame pose wrt base + self.X_BoGo = self._create_translation_matrix( + x=self.measurements["gripper"][0], + y=self.measurements["gripper"][1], + z=self.measurements["gripper"][2], + ) + + # Wrist + # Screw axis of wrist frame wrt base frame + self.S_BR = np.array([0, 1, 0, -self.measurements["wrist"][2], 0, self.measurements["wrist"][0]]) + + # 0-position origin to centroid transform + self.X_RoRc = self._create_translation_matrix(x=0.0035, y=-0.002) + + # 0-position wrist frame pose wrt base + self.X_BR = self._create_translation_matrix( + x=self.measurements["wrist"][0], + y=self.measurements["wrist"][1], + z=self.measurements["wrist"][2], + ) + + # Forearm + # Screw axis of forearm frame wrt base frame + self.S_BF = np.array( + [ + 0, + 1, + 0, + -self.measurements["forearm"][2], + 0, + self.measurements["forearm"][0], + ] + ) + + # Forearm origin + centroid transform + self.X_FoFc = self._create_translation_matrix(x=0.036) # spellchecker:disable-line + + # 0-position forearm frame pose wrt base + self.X_BF = self._create_translation_matrix( + x=self.measurements["forearm"][0], + y=self.measurements["forearm"][1], + z=self.measurements["forearm"][2], + ) + + # Humerus + # Screw axis of humerus frame wrt base frame + self.S_BH = np.array( + [ + 0, + -1, + 0, + self.measurements["humerus"][2], + 0, + -self.measurements["humerus"][0], + ] + ) + + # Humerus origin to centroid transform + self.X_HoHc = self._create_translation_matrix(x=0.0475) + + # 0-position humerus frame pose wrt base + self.X_BH = self._create_translation_matrix( + x=self.measurements["humerus"][0], + y=self.measurements["humerus"][1], + z=self.measurements["humerus"][2], + ) + + # Shoulder + # Screw axis of shoulder frame wrt Base frame + self.S_BS = np.array([0, 0, -1, 0, 0, 0]) + + # Shoulder origin to centroid transform + self.X_SoSc = self._create_translation_matrix(x=-0.017, z=0.0235) + + # 0-position shoulder frame pose wrt base + self.X_BS = self._create_translation_matrix( + x=self.measurements["shoulder"][0], + y=self.measurements["shoulder"][1], + z=self.measurements["shoulder"][2], + ) + + # Base + # Base origin to centroid transform + self.X_BoBc = self._create_translation_matrix(y=0.015) + + # World to base transform + self.X_WoBo = self._create_translation_matrix( + x=self.measurements["base"][0], + y=self.measurements["base"][1], + z=self.measurements["base"][2], + ) + + # Pre-compute gripper post-multiplication matrix + self._fk_gripper_post = self.X_GoGc @ self.X_BoGo @ self.gripper_X0 + + def fk_base(self): + """Forward kinematics for the base frame.""" + return self.X_WoBo @ self.X_BoBc @ self.base_X0 + + def fk_shoulder(self, robot_pos_deg): + """Forward kinematics for the shoulder frame.""" + robot_pos_rad = robot_pos_deg / 180 * np.pi + return self.X_WoBo @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) @ self.X_SoSc @ self.X_BS + + def fk_humerus(self, robot_pos_deg): + """Forward kinematics for the humerus frame.""" + robot_pos_rad = robot_pos_deg / 180 * np.pi + return ( + self.X_WoBo + @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) + @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) + @ self.X_HoHc + @ self.X_BH + ) + + def fk_forearm(self, robot_pos_deg): + """Forward kinematics for the forearm frame.""" + robot_pos_rad = robot_pos_deg / 180 * np.pi + return ( + self.X_WoBo + @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) + @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) + @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) + @ self.X_FoFc # spellchecker:disable-line + @ self.X_BF + ) + + def fk_wrist(self, robot_pos_deg): + """Forward kinematics for the wrist frame.""" + robot_pos_rad = robot_pos_deg / 180 * np.pi + return ( + self.X_WoBo + @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) + @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) + @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) + @ screw_axis_to_transform(self.S_BR, robot_pos_rad[3]) + @ self.X_RoRc + @ self.X_BR + @ self.wrist_X0 + ) + + def fk_gripper(self, robot_pos_deg): + """Forward kinematics for the gripper frame.""" + robot_pos_rad = robot_pos_deg / 180 * np.pi + return ( + self.X_WoBo + @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) + @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) + @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) + @ screw_axis_to_transform(self.S_BR, robot_pos_rad[3]) + @ screw_axis_to_transform(self.S_BG, robot_pos_rad[4]) + @ self._fk_gripper_post + ) + + def fk_gripper_tip(self, robot_pos_deg): + """Forward kinematics for the gripper tip frame.""" + robot_pos_rad = robot_pos_deg / 180 * np.pi + return ( + self.X_WoBo + @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) + @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) + @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) + @ screw_axis_to_transform(self.S_BR, robot_pos_rad[3]) + @ screw_axis_to_transform(self.S_BG, robot_pos_rad[4]) + @ self.X_GoGt + @ self.X_BoGo + @ self.gripper_X0 + ) + + def compute_jacobian(self, robot_pos_deg, fk_func=None): + """Finite differences to compute the Jacobian. + J(i, j) represents how the ith component of the end-effector's velocity changes wrt a small change + in the jth joint's velocity. + + Args: + robot_pos_deg: Current joint positions in degrees + fk_func: Forward kinematics function to use (defaults to fk_gripper) + """ + if fk_func is None: + fk_func = self.fk_gripper + + eps = 1e-8 + jac = np.zeros(shape=(6, 5)) + delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64) + for el_ix in range(len(robot_pos_deg[:-1])): + delta *= 0 + delta[el_ix] = eps / 2 + Sdot = ( + pose_difference_se3( + fk_func(robot_pos_deg[:-1] + delta), + fk_func(robot_pos_deg[:-1] - delta), + ) + / eps + ) + jac[:, el_ix] = Sdot + return jac + + def compute_positional_jacobian(self, robot_pos_deg, fk_func=None): + """Finite differences to compute the positional Jacobian. + J(i, j) represents how the ith component of the end-effector's position changes wrt a small change + in the jth joint's velocity. + + Args: + robot_pos_deg: Current joint positions in degrees + fk_func: Forward kinematics function to use (defaults to fk_gripper) + """ + if fk_func is None: + fk_func = self.fk_gripper + + eps = 1e-8 + jac = np.zeros(shape=(3, 5)) + delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64) + for el_ix in range(len(robot_pos_deg[:-1])): + delta *= 0 + delta[el_ix] = eps / 2 + Sdot = ( + fk_func(robot_pos_deg[:-1] + delta)[:3, 3] - fk_func(robot_pos_deg[:-1] - delta)[:3, 3] + ) / eps + jac[:, el_ix] = Sdot + return jac + + def ik(self, current_joint_state, desired_ee_pose, position_only=True, fk_func=None): + """Inverse kinematics using gradient descent. + + Args: + current_joint_state: Initial joint positions in degrees + desired_ee_pose: Target end-effector pose as a 4x4 transformation matrix + position_only: If True, only match end-effector position, not orientation + fk_func: Forward kinematics function to use (defaults to fk_gripper) + + Returns: + Joint positions in degrees that achieve the desired end-effector pose + """ + if fk_func is None: + fk_func = self.fk_gripper + + # Do gradient descent. + max_iterations = 5 + learning_rate = 1 + for _ in range(max_iterations): + current_ee_pose = fk_func(current_joint_state) + if not position_only: + error = se3_error(desired_ee_pose, current_ee_pose) + jac = self.compute_jacobian(current_joint_state, fk_func) + else: + error = desired_ee_pose[:3, 3] - current_ee_pose[:3, 3] + jac = self.compute_positional_jacobian(current_joint_state, fk_func) + delta_angles = np.linalg.pinv(jac) @ error + current_joint_state[:-1] += learning_rate * delta_angles + + if np.linalg.norm(error) < 5e-3: + return current_joint_state + return current_joint_state + + +if __name__ == "__main__": + import time + + def run_test(robot_type): + """Run test suite for a specific robot type.""" + print(f"\n--- Testing {robot_type.upper()} Robot ---") + + # Initialize kinematics for this robot + robot = RobotKinematics(robot_type) + + # Test 1: Forward kinematics consistency + print("Test 1: Forward kinematics consistency") + test_angles = np.array([30, 45, -30, 20, 10, 0]) # Example joint angles in degrees + + # Calculate FK for different joints + shoulder_pose = robot.fk_shoulder(test_angles) + humerus_pose = robot.fk_humerus(test_angles) + forearm_pose = robot.fk_forearm(test_angles) + wrist_pose = robot.fk_wrist(test_angles) + gripper_pose = robot.fk_gripper(test_angles) + gripper_tip_pose = robot.fk_gripper_tip(test_angles) + + # Check that poses form a consistent kinematic chain (positions should be progressively further from origin) + distances = [ + np.linalg.norm(shoulder_pose[:3, 3]), + np.linalg.norm(humerus_pose[:3, 3]), + np.linalg.norm(forearm_pose[:3, 3]), + np.linalg.norm(wrist_pose[:3, 3]), + np.linalg.norm(gripper_pose[:3, 3]), + np.linalg.norm(gripper_tip_pose[:3, 3]), + ] + + # Check if distances generally increase along the chain + is_consistent = all(distances[i] <= distances[i + 1] for i in range(len(distances) - 1)) + print(f" Pose distances from origin: {[round(d, 3) for d in distances]}") + print(f" Kinematic chain consistency: {'PASSED' if is_consistent else 'FAILED'}") + + # Test 2: Jacobian computation + print("Test 2: Jacobian computation") + jacobian = robot.compute_jacobian(test_angles) + positional_jacobian = robot.compute_positional_jacobian(test_angles) + + # Check shapes + jacobian_shape_ok = jacobian.shape == (6, 5) + pos_jacobian_shape_ok = positional_jacobian.shape == (3, 5) + + print(f" Jacobian shape: {'PASSED' if jacobian_shape_ok else 'FAILED'}") + print(f" Positional Jacobian shape: {'PASSED' if pos_jacobian_shape_ok else 'FAILED'}") + + # Test 3: Inverse kinematics + print("Test 3: Inverse kinematics (position only)") + + # Generate target pose from known joint angles + original_angles = np.array([10, 20, 30, -10, 5, 0]) + target_pose = robot.fk_gripper(original_angles) + + # Start IK from a different position + initial_guess = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + + # Measure IK performance + start_time = time.time() + computed_angles = robot.ik(initial_guess.copy(), target_pose) + ik_time = time.time() - start_time + + # Compute resulting pose from IK solution + result_pose = robot.fk_gripper(computed_angles) + + # Calculate position error + pos_error = np.linalg.norm(target_pose[:3, 3] - result_pose[:3, 3]) + passed = pos_error < 0.01 # Accept errors less than 1cm + + print(f" IK computation time: {ik_time:.4f} seconds") + print(f" Position error: {pos_error:.4f}") + print(f" IK position accuracy: {'PASSED' if passed else 'FAILED'}") + + return is_consistent and jacobian_shape_ok and pos_jacobian_shape_ok and passed + + # Run tests for all robot types + results = {} + for robot_type in ["koch", "so100", "moss"]: + results[robot_type] = run_test(robot_type) + + # Print overall summary + print("\n=== Test Summary ===") + all_passed = all(results.values()) + for robot_type, passed in results.items(): + print(f"{robot_type.upper()}: {'PASSED' if passed else 'FAILED'}") + print(f"\nOverall: {'ALL TESTS PASSED' if all_passed else 'SOME TESTS FAILED'}") \ No newline at end of file diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index aa53dc97..b1d32bbd 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -798,8 +798,11 @@ class MotorsBus(abc.ABC): if drive_mode: val *= -1 val += homing_offset - normalized_values[id_] = val / (self.model_resolution_table[self.motors[motor].model] // 2) * 180 - # TODO(alibers): degree mode + normalized_values[id_] = ( + val / (self.model_resolution_table[self.motors[motor].model] // 2) * 180 + ) + else: + # TODO(alibers): velocity and degree modes raise NotImplementedError return normalized_values @@ -827,7 +830,9 @@ class MotorsBus(abc.ABC): unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_) elif self.motors[motor].norm_mode is MotorNormMode.DEGREE: homing_offset = self.calibration[motor].homing_offset - unnormalized_values[id_] = int(val / 180 * (self.model_resolution_table[self.motors[motor].model] // 2)) + unnormalized_values[id_] = int( + val / 180 * (self.model_resolution_table[self.motors[motor].model] // 2) + ) unnormalized_values[id_] -= homing_offset if drive_mode: unnormalized_values[id_] *= -1 diff --git a/lerobot/common/robots/so100_follower_end_effector/__init__.py b/lerobot/common/robots/so100_follower_end_effector/__init__.py new file mode 100644 index 00000000..7df37901 --- /dev/null +++ b/lerobot/common/robots/so100_follower_end_effector/__init__.py @@ -0,0 +1,2 @@ +from .config_so100_follower_end_effector import SO100FollowerEndEffectorConfig +from .so100_follower_end_effector import SO100FollowerEndEffector \ No newline at end of file diff --git a/lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py b/lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py new file mode 100644 index 00000000..e828101f --- /dev/null +++ b/lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List, Optional + +from dataclasses import dataclass, field +from lerobot.common.cameras import CameraConfig +from ..config import RobotConfig + + +@RobotConfig.register_subclass("so100_follower_end_effector") +@dataclass +class SO100FollowerEndEffectorConfig(RobotConfig): + """Configuration for the SO100FollowerEndEffector robot.""" + # Port to connect to the arm + port: str + + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None + + # cameras + cameras: dict[str, CameraConfig] = field(default_factory=dict) + + # Default bounds for the end-effector position (in meters) + bounds: Dict[str, List[float]] = field(default_factory=lambda: { + "min": [0.1, -0.3, 0.0], # min x, y, z + "max": [0.4, 0.3, 0.4], # max x, y, z + } ) \ No newline at end of file diff --git a/lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py b/lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py new file mode 100644 index 00000000..9a691f3f --- /dev/null +++ b/lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import numpy as np +from typing import Any, Dict + +from lerobot.common.errors import DeviceNotConnectedError +from lerobot.common.model.kinematics import RobotKinematics + +from ..so100_follower import SO100Follower +from .config_so100_follower_end_effector import SO100FollowerEndEffectorConfig +from lerobot.common.motors import Motor, MotorNormMode +from lerobot.common.motors.feetech import FeetechMotorsBus + +logger = logging.getLogger(__name__) + + +class SO100FollowerEndEffector(SO100Follower): + """ + SO100Follower robot with end-effector space control. + + This robot inherits from SO100Follower but transforms actions from + end-effector space to joint space before sending them to the motors. + """ + + config_class = SO100FollowerEndEffectorConfig + name = "so100_follower_end_effector" + + def __init__(self, config: SO100FollowerEndEffectorConfig): + super().__init__(config) + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pan": Motor(1, "sts3215", MotorNormMode.DEGREE), + "shoulder_lift": Motor(2, "sts3215", MotorNormMode.DEGREE), + "elbow_flex": Motor(3, "sts3215", MotorNormMode.DEGREE), + "wrist_flex": Motor(4, "sts3215", MotorNormMode.DEGREE), + "wrist_roll": Motor(5, "sts3215", MotorNormMode.DEGREE), + "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + ) + + self.config = config + + # Initialize the kinematics module for the so100 robot + self.kinematics = RobotKinematics(robot_type="so100") + + # Set the forward kinematics function + self.fk_function = self.kinematics.fk_gripper_tip + + # Store the bounds for end-effector position + self.bounds = self.config.bounds + + # Store the joint mins and maxs + self.joint_mins = None + self.joint_maxs = None + + @property + def action_features(self) -> Dict[str, Any]: + """ + Define action features for end-effector control. + Returns dictionary with dtype, shape, and names. + """ + return { + "dtype": "float32", + "shape": (4,), + "names": {"delta_x": 0, "delta_y": 1, "delta_z": 2, "gripper": 3}, + } + + def connect(self): + super().connect() + self.joint_mins = self.bus.sync_read("Min_Position_Limit") + self.joint_maxs = self.bus.sync_read("Max_Position_Limit") + + def send_action(self, action: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform action from end-effector space to joint space and send to motors. + + Args: + action: Dictionary with keys 'delta_x', 'delta_y', 'delta_z' for end-effector control + or a numpy array with [delta_x, delta_y, delta_z] + + Returns: + The joint-space action that was sent to the motors + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Convert action to numpy array if not already + if isinstance(action, dict): + if all(k in action for k in ["delta_x", "delta_y", "delta_z", "gripper"]): + action = np.array([action["delta_x"], action["delta_y"], action["delta_z"], action["gripper"]], dtype=np.float32) + else: + logger.warning(f"Expected action keys 'delta_x', 'delta_y', 'delta_z', got {list(action.keys())}") + action = np.zeros(4, dtype=np.float32) + + + # Read current joint positions + current_joint_pos = self.bus.sync_read("Present_Position") + + # Convert dict to ordered list without gripper + joint_names = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"] + # Convert the joint positions from min-max to degrees + current_joint_pos = np.array([current_joint_pos[name] for name in joint_names]) + + # Calculate current end-effector position using forward kinematics + current_ee_pos = self.fk_function(current_joint_pos) + + # Set desired end-effector position by adding delta + desired_ee_pos = np.eye(4) + desired_ee_pos[:3, :3] = current_ee_pos[:3, :3] # Keep orientation + + # Add delta to position and clip to bounds + desired_ee_pos[:3, 3] = current_ee_pos[:3, 3] + action[:3] + if self.bounds is not None: + desired_ee_pos[:3, 3] = np.clip( + desired_ee_pos[:3, 3], + self.bounds["min"], + self.bounds["max"], + ) + + # Compute inverse kinematics to get joint positions + target_joint_values_in_degrees = self.kinematics.ik( + current_joint_pos, + desired_ee_pos, + position_only=True, + fk_func=self.fk_function, + ) + + # Create joint space action dictionary + joint_action = { + f"{joint_names[i]}.pos": target_joint_values_in_degrees[i] + for i in range(len(joint_names)-1) # Exclude gripper + } + + # Handle gripper separately if included in action + if "gripper.pos" in action: + joint_action["gripper.pos"] = action["gripper.pos"] + else: + # Keep current gripper position + joint_action["gripper.pos"] = current_joint_pos[-1] + + import time + time.sleep(0.001) + # Send joint space action to parent class + return super().send_action(joint_action) \ No newline at end of file diff --git a/lerobot/common/robots/utils.py b/lerobot/common/robots/utils.py index 587d7524..dd100225 100644 --- a/lerobot/common/robots/utils.py +++ b/lerobot/common/robots/utils.py @@ -14,12 +14,10 @@ import logging from pprint import pformat -from typing import Type from lerobot.common.robots import RobotConfig from .robot import Robot -from .robot_wrapper import RobotWrapper def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: @@ -35,7 +33,9 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: return SO100FollowerConfig(**kwargs) elif robot_type == "so100_follower_end_effector": - from .so100_follower_end_effector.config_so100_follower_end_effector import SO100FollowerEndEffectorConfig + from .so100_follower_end_effector.config_so100_follower_end_effector import ( + SO100FollowerEndEffectorConfig, + ) return SO100FollowerEndEffectorConfig(**kwargs) elif robot_type == "stretch": @@ -132,8 +132,3 @@ def get_arm_id(name, arm_type): """ return f"{name}_{arm_type}" -def convert_joint_m100_100_to_degrees(joint_positions: dict[str, float], mins: dict[str, float], maxs: dict[str, float]) -> dict[str, float]: - return {key: ((value + 100) / 200) * (maxs[key] - mins[key]) + mins[key] for key, value in joint_positions.items()} - -def convert_joint_degrees_to_m100_100(joint_positions: dict[str, float], mins: dict[str, float], maxs: dict[str, float]) -> dict[str, float]: - return {key: (value - mins[key]) / (maxs[key] - mins[key]) * 200 - 100 for key, value in joint_positions.items()} diff --git a/lerobot/common/teleoperators/gamepad/__init__.py b/lerobot/common/teleoperators/gamepad/__init__.py new file mode 100644 index 00000000..a8b47eaf --- /dev/null +++ b/lerobot/common/teleoperators/gamepad/__init__.py @@ -0,0 +1,4 @@ +from .configuration_gamepad import GamepadTeleopConfig +from .teleop_gamepad import GamepadTeleop + +__all__ = ["GamepadTeleopConfig", "GamepadTeleop"] diff --git a/lerobot/common/teleoperators/gamepad/configuration_gamepad.py b/lerobot/common/teleoperators/gamepad/configuration_gamepad.py new file mode 100644 index 00000000..10cdf6aa --- /dev/null +++ b/lerobot/common/teleoperators/gamepad/configuration_gamepad.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("gamepad") +@dataclass +class GamepadTeleopConfig(TeleoperatorConfig): + # TODO(Steven): Consider setting in here the keys that we want to capture/listen + mock: bool = False + + x_step_size: float = 0.05 + y_step_size: float = 0.05 + z_step_size: float = 0.05 + + use_gripper: bool = True diff --git a/lerobot/common/teleoperators/gamepad/gamepad_utils.py b/lerobot/common/teleoperators/gamepad/gamepad_utils.py new file mode 100644 index 00000000..cb9ba0b1 --- /dev/null +++ b/lerobot/common/teleoperators/gamepad/gamepad_utils.py @@ -0,0 +1,719 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import sys +import time + +import numpy as np +import torch + +from lerobot.common.utils.robot_utils import busy_wait +from lerobot.common.utils.utils import init_logging +from lerobot.common.robots.kinematics import RobotKinematics + + +class InputController: + """Base class for input controllers that generate motion deltas.""" + + def __init__(self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01): + """ + Initialize the controller. + + Args: + x_step_size: Base movement step size in meters + y_step_size: Base movement step size in meters + z_step_size: Base movement step size in meters + """ + self.x_step_size = x_step_size + self.y_step_size = y_step_size + self.z_step_size = z_step_size + self.running = True + self.episode_end_status = None # None, "success", or "failure" + self.intervention_flag = False + self.open_gripper_command = False + self.close_gripper_command = False + + def start(self): + """Start the controller and initialize resources.""" + pass + + def stop(self): + """Stop the controller and release resources.""" + pass + + def get_deltas(self): + """Get the current movement deltas (dx, dy, dz) in meters.""" + return 0.0, 0.0, 0.0 + + def should_quit(self): + """Return True if the user has requested to quit.""" + return not self.running + + def update(self): + """Update controller state - call this once per frame.""" + pass + + def __enter__(self): + """Support for use in 'with' statements.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Ensure resources are released when exiting 'with' block.""" + self.stop() + + def get_episode_end_status(self): + """ + Get the current episode end status. + + Returns: + None if episode should continue, "success" or "failure" otherwise + """ + status = self.episode_end_status + self.episode_end_status = None # Reset after reading + return status + + def should_intervene(self): + """Return True if intervention flag was set.""" + return self.intervention_flag + + def gripper_command(self): + """Return the current gripper command.""" + if self.open_gripper_command == self.close_gripper_command: + return "no-op" + elif self.open_gripper_command: + return "open" + elif self.close_gripper_command: + return "close" + + +class KeyboardController(InputController): + """Generate motion deltas from keyboard input.""" + + def __init__(self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01): + super().__init__(x_step_size, y_step_size, z_step_size) + self.key_states = { + "forward_x": False, + "backward_x": False, + "forward_y": False, + "backward_y": False, + "forward_z": False, + "backward_z": False, + "quit": False, + "success": False, + "failure": False, + } + self.listener = None + + def start(self): + """Start the keyboard listener.""" + from pynput import keyboard + + def on_press(key): + try: + if key == keyboard.Key.up: + self.key_states["forward_x"] = True + elif key == keyboard.Key.down: + self.key_states["backward_x"] = True + elif key == keyboard.Key.left: + self.key_states["forward_y"] = True + elif key == keyboard.Key.right: + self.key_states["backward_y"] = True + elif key == keyboard.Key.shift: + self.key_states["backward_z"] = True + elif key == keyboard.Key.shift_r: + self.key_states["forward_z"] = True + elif key == keyboard.Key.esc: + self.key_states["quit"] = True + self.running = False + return False + elif key == keyboard.Key.enter: + self.key_states["success"] = True + self.episode_end_status = "success" + elif key == keyboard.Key.backspace: + self.key_states["failure"] = True + self.episode_end_status = "failure" + except AttributeError: + pass + + def on_release(key): + try: + if key == keyboard.Key.up: + self.key_states["forward_x"] = False + elif key == keyboard.Key.down: + self.key_states["backward_x"] = False + elif key == keyboard.Key.left: + self.key_states["forward_y"] = False + elif key == keyboard.Key.right: + self.key_states["backward_y"] = False + elif key == keyboard.Key.shift: + self.key_states["backward_z"] = False + elif key == keyboard.Key.shift_r: + self.key_states["forward_z"] = False + elif key == keyboard.Key.enter: + self.key_states["success"] = False + elif key == keyboard.Key.backspace: + self.key_states["failure"] = False + except AttributeError: + pass + + self.listener = keyboard.Listener(on_press=on_press, on_release=on_release) + self.listener.start() + + print("Keyboard controls:") + print(" Arrow keys: Move in X-Y plane") + print(" Shift and Shift_R: Move in Z axis") + print(" Enter: End episode with SUCCESS") + print(" Backspace: End episode with FAILURE") + print(" ESC: Exit") + + def stop(self): + """Stop the keyboard listener.""" + if self.listener and self.listener.is_alive(): + self.listener.stop() + + def get_deltas(self): + """Get the current movement deltas from keyboard state.""" + delta_x = delta_y = delta_z = 0.0 + + if self.key_states["forward_x"]: + delta_x += self.x_step_size + if self.key_states["backward_x"]: + delta_x -= self.x_step_size + if self.key_states["forward_y"]: + delta_y += self.y_step_size + if self.key_states["backward_y"]: + delta_y -= self.y_step_size + if self.key_states["forward_z"]: + delta_z += self.z_step_size + if self.key_states["backward_z"]: + delta_z -= self.z_step_size + + return delta_x, delta_y, delta_z + + def should_quit(self): + """Return True if ESC was pressed.""" + return self.key_states["quit"] + + def should_save(self): + """Return True if Enter was pressed (save episode).""" + return self.key_states["success"] or self.key_states["failure"] + + +class GamepadController(InputController): + """Generate motion deltas from gamepad input.""" + + def __init__(self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01, deadzone=0.1): + super().__init__(x_step_size, y_step_size, z_step_size) + self.deadzone = deadzone + self.joystick = None + self.intervention_flag = False + + def start(self): + """Initialize pygame and the gamepad.""" + import pygame + + pygame.init() + pygame.joystick.init() + + if pygame.joystick.get_count() == 0: + logging.error("No gamepad detected. Please connect a gamepad and try again.") + self.running = False + return + + self.joystick = pygame.joystick.Joystick(0) + self.joystick.init() + logging.info(f"Initialized gamepad: {self.joystick.get_name()}") + + print("Gamepad controls:") + print(" Left analog stick: Move in X-Y plane") + print(" Right analog stick (vertical): Move in Z axis") + print(" B/Circle button: Exit") + print(" Y/Triangle button: End episode with SUCCESS") + print(" A/Cross button: End episode with FAILURE") + print(" X/Square button: Rerecord episode") + + def stop(self): + """Clean up pygame resources.""" + import pygame + + if pygame.joystick.get_init(): + if self.joystick: + self.joystick.quit() + pygame.joystick.quit() + pygame.quit() + + def update(self): + """Process pygame events to get fresh gamepad readings.""" + import pygame + + for event in pygame.event.get(): + if event.type == pygame.JOYBUTTONDOWN: + if event.button == 3: + self.episode_end_status = "success" + # A button (1) for failure + elif event.button == 1: + self.episode_end_status = "failure" + # X button (0) for rerecord + elif event.button == 0: + self.episode_end_status = "rerecord_episode" + + # RB button (6) for closing gripper + elif event.button == 6: + self.close_gripper_command = True + + # LT button (7) for opening gripper + elif event.button == 7: + self.open_gripper_command = True + + # Reset episode status on button release + elif event.type == pygame.JOYBUTTONUP: + if event.button in [0, 2, 3]: + self.episode_end_status = None + + elif event.button == 6: + self.close_gripper_command = False + + elif event.button == 7: + self.open_gripper_command = False + + # Check for RB button (typically button 5) for intervention flag + if self.joystick.get_button(5): + self.intervention_flag = True + else: + self.intervention_flag = False + + def get_deltas(self): + """Get the current movement deltas from gamepad state.""" + import pygame + + try: + # Read joystick axes + # Left stick X and Y (typically axes 0 and 1) + x_input = self.joystick.get_axis(0) # Left/Right + y_input = self.joystick.get_axis(1) # Up/Down (often inverted) + + # Right stick Y (typically axis 3 or 4) + z_input = self.joystick.get_axis(3) # Up/Down for Z + + # Apply deadzone to avoid drift + x_input = 0 if abs(x_input) < self.deadzone else x_input + y_input = 0 if abs(y_input) < self.deadzone else y_input + z_input = 0 if abs(z_input) < self.deadzone else z_input + + # Calculate deltas (note: may need to invert axes depending on controller) + delta_x = -y_input * self.y_step_size # Forward/backward + delta_y = -x_input * self.x_step_size # Left/right + delta_z = -z_input * self.z_step_size # Up/down + + return delta_x, delta_y, delta_z + + except pygame.error: + logging.error("Error reading gamepad. Is it still connected?") + return 0.0, 0.0, 0.0 + + +class GamepadControllerHID(InputController): + """Generate motion deltas from gamepad input using HIDAPI.""" + + def __init__( + self, + x_step_size=0.01, + y_step_size=0.01, + z_step_size=0.01, + deadzone=0.1, + vendor_id=0x046D, + product_id=0xC219, + ): + """ + Initialize the HID gamepad controller. + + Args: + step_size: Base movement step size in meters + z_scale: Scaling factor for Z-axis movement + deadzone: Joystick deadzone to prevent drift + vendor_id: USB vendor ID of the gamepad (default: Logitech) + product_id: USB product ID of the gamepad (default: RumblePad 2) + """ + super().__init__(x_step_size, y_step_size, z_step_size) + self.deadzone = deadzone + self.vendor_id = vendor_id + self.product_id = product_id + self.device = None + self.device_info = None + + # Movement values (normalized from -1.0 to 1.0) + self.left_x = 0.0 + self.left_y = 0.0 + self.right_x = 0.0 + self.right_y = 0.0 + + # Button states + self.buttons = {} + self.quit_requested = False + self.save_requested = False + + def find_device(self): + """Look for the gamepad device by vendor and product ID.""" + import hid + + devices = hid.enumerate() + for device in devices: + if device["vendor_id"] == self.vendor_id and device["product_id"] == self.product_id: + logging.info(f"Found gamepad: {device.get('product_string', 'Unknown')}") + return device + + logging.error( + f"No gamepad with vendor ID 0x{self.vendor_id:04X} and product ID 0x{self.product_id:04X} found" + ) + return None + + def start(self): + """Connect to the gamepad using HIDAPI.""" + import hid + + self.device_info = self.find_device() + if not self.device_info: + self.running = False + return + + try: + logging.info(f"Connecting to gamepad at path: {self.device_info['path']}") + self.device = hid.device() + self.device.open_path(self.device_info["path"]) + self.device.set_nonblocking(1) + + manufacturer = self.device.get_manufacturer_string() + product = self.device.get_product_string() + logging.info(f"Connected to {manufacturer} {product}") + + logging.info("Gamepad controls (HID mode):") + logging.info(" Left analog stick: Move in X-Y plane") + logging.info(" Right analog stick: Move in Z axis (vertical)") + logging.info(" Button 1/B/Circle: Exit") + logging.info(" Button 2/A/Cross: End episode with SUCCESS") + logging.info(" Button 3/X/Square: End episode with FAILURE") + + except OSError as e: + logging.error(f"Error opening gamepad: {e}") + logging.error("You might need to run this with sudo/admin privileges on some systems") + self.running = False + + def stop(self): + """Close the HID device connection.""" + if self.device: + self.device.close() + self.device = None + + def update(self): + """ + Read and process the latest gamepad data. + Due to an issue with the HIDAPI, we need to read the read the device several times in order to get a stable reading + """ + for _ in range(10): + self._update() + + def _update(self): + """Read and process the latest gamepad data.""" + if not self.device or not self.running: + return + + try: + # Read data from the gamepad + data = self.device.read(64) + # Interpret gamepad data - this will vary by controller model + # These offsets are for the Logitech RumblePad 2 + if data and len(data) >= 8: + # Normalize joystick values from 0-255 to -1.0-1.0 + self.left_x = (data[1] - 128) / 128.0 + self.left_y = (data[2] - 128) / 128.0 + self.right_x = (data[3] - 128) / 128.0 + self.right_y = (data[4] - 128) / 128.0 + + # Apply deadzone + self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x + self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y + self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x + self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y + + # Parse button states (byte 5 in the Logitech RumblePad 2) + buttons = data[5] + + # Check if RB is pressed then the intervention flag should be set + self.intervention_flag = data[6] in [2, 6, 10, 14] + + # Check if RT is pressed + self.open_gripper_command = data[6] in [8, 10, 12] + + # Check if LT is pressed + self.close_gripper_command = data[6] in [4, 6, 12] + + # Check if Y/Triangle button (bit 7) is pressed for saving + # Check if X/Square button (bit 5) is pressed for failure + # Check if A/Cross button (bit 4) is pressed for rerecording + if buttons & 1 << 7: + self.episode_end_status = "success" + elif buttons & 1 << 5: + self.episode_end_status = "failure" + elif buttons & 1 << 4: + self.episode_end_status = "rerecord_episode" + else: + self.episode_end_status = None + + except OSError as e: + logging.error(f"Error reading from gamepad: {e}") + + def get_deltas(self): + """Get the current movement deltas from gamepad state.""" + # Calculate deltas - invert as needed based on controller orientation + delta_x = -self.left_y * self.x_step_size # Forward/backward + delta_y = -self.left_x * self.y_step_size # Left/right + delta_z = -self.right_y * self.z_step_size # Up/down + + return delta_x, delta_y, delta_z + + def should_quit(self): + """Return True if quit button was pressed.""" + return self.quit_requested + + def should_save(self): + """Return True if save button was pressed.""" + return self.save_requested + + +def test_forward_kinematics(robot, fps=10): + logging.info("Testing Forward Kinematics") + timestep = time.perf_counter() + kinematics = RobotKinematics(robot.robot_type) + while time.perf_counter() - timestep < 60.0: + loop_start_time = time.perf_counter() + robot.teleop_step() + obs = robot.capture_observation() + joint_positions = obs["observation.state"].cpu().numpy() + ee_pos = kinematics.fk_gripper_tip(joint_positions) + logging.info(f"EE Position: {ee_pos[:3, 3]}") + busy_wait(1 / fps - (time.perf_counter() - loop_start_time)) + + +def test_inverse_kinematics(robot, fps=10): + logging.info("Testing Inverse Kinematics") + timestep = time.perf_counter() + while time.perf_counter() - timestep < 60.0: + loop_start_time = time.perf_counter() + obs = robot.capture_observation() + joint_positions = obs["observation.state"].cpu().numpy() + ee_pos = RobotKinematics.fk_gripper_tip(joint_positions) + desired_ee_pos = ee_pos + target_joint_state = RobotKinematics.ik(joint_positions, desired_ee_pos, position_only=True) + robot.send_action(torch.from_numpy(target_joint_state)) + logging.info(f"Target Joint State: {target_joint_state}") + busy_wait(1 / fps - (time.perf_counter() - loop_start_time)) + + +def teleoperate_inverse_kinematics_with_leader(robot, fps=10): + logging.info("Testing Inverse Kinematics") + kinematics = RobotKinematics(robot.robot_type) + timestep = time.perf_counter() + while time.perf_counter() - timestep < 60.0: + loop_start_time = time.perf_counter() + obs = robot.capture_observation() + joint_positions = obs["observation.state"].cpu().numpy() + ee_pos = kinematics.fk_gripper_tip(joint_positions) + + leader_joint_positions = robot.leader_arms["main"].read("Present_Position") + leader_ee = kinematics.fk_gripper_tip(leader_joint_positions) + + desired_ee_pos = leader_ee + target_joint_state = kinematics.ik( + joint_positions, desired_ee_pos, position_only=True, fk_func=kinematics.fk_gripper_tip + ) + robot.send_action(torch.from_numpy(target_joint_state)) + logging.info(f"Leader EE: {leader_ee[:3, 3]}, Follower EE: {ee_pos[:3, 3]}") + busy_wait(1 / fps - (time.perf_counter() - loop_start_time)) + + +def teleoperate_delta_inverse_kinematics_with_leader(robot, fps=10): + logging.info("Testing Delta End-Effector Control") + timestep = time.perf_counter() + + # Initial position capture + obs = robot.capture_observation() + joint_positions = obs["observation.state"].cpu().numpy() + + kinematics = RobotKinematics(robot.robot_type) + + leader_joint_positions = robot.leader_arms["main"].read("Present_Position") + initial_leader_ee = kinematics.fk_gripper_tip(leader_joint_positions) + + desired_ee_pos = np.diag(np.ones(4)) + joint_positions = robot.follower_arms["main"].read("Present_Position") + fixed_ee_pos = kinematics.fk_gripper_tip(joint_positions) + + while time.perf_counter() - timestep < 60.0: + loop_start_time = time.perf_counter() + + # Get leader state for teleoperation + leader_joint_positions = robot.leader_arms["main"].read("Present_Position") + leader_ee = kinematics.fk_gripper_tip(leader_joint_positions) + + # Get current state + # obs = robot.capture_observation() + # joint_positions = obs["observation.state"].cpu().numpy() + joint_positions = robot.follower_arms["main"].read("Present_Position") + current_ee_pos = kinematics.fk_gripper_tip(joint_positions) + + # Calculate delta between leader and follower end-effectors + # Scaling factor can be adjusted for sensitivity + scaling_factor = 1.0 + ee_delta = -np.clip((leader_ee - initial_leader_ee) * scaling_factor, -0.05, 0.05) + + # Apply delta to current position + desired_ee_pos[0, 3] = fixed_ee_pos[0, 3] # current_ee_pos[0, 3] + ee_delta[0, 3] * 0 + desired_ee_pos[1, 3] = fixed_ee_pos[1, 3] # current_ee_pos[1, 3] + ee_delta[1, 3] * 0 + desired_ee_pos[2, 3] = current_ee_pos[2, 3] - ee_delta[2, 3] + + # Compute joint targets via inverse kinematics + target_joint_state = kinematics.ik( + joint_positions, desired_ee_pos, position_only=True, fk_func=kinematics.fk_gripper_tip + ) + + initial_leader_ee = leader_ee.copy() + + # Send command to robot + robot.send_action(torch.from_numpy(target_joint_state)) + + # Logging + logging.info(f"Current EE: {current_ee_pos[:3, 3]}, Desired EE: {desired_ee_pos[:3, 3]}") + logging.info(f"Delta EE: {ee_delta[:3, 3]}") + + busy_wait(1 / fps - (time.perf_counter() - loop_start_time)) + + +def teleoperate_delta_inverse_kinematics(robot, controller, fps=10, bounds=None, fk_func=None): + """ + Control a robot using delta end-effector movements from any input controller. + + Args: + robot: Robot instance to control + controller: InputController instance (keyboard, gamepad, etc.) + fps: Control frequency in Hz + bounds: Optional position limits + fk_func: Forward kinematics function to use + """ + if fk_func is None: + fk_func = RobotKinematics.fk_gripper_tip + + logging.info(f"Testing Delta End-Effector Control with {controller.__class__.__name__}") + + # Initial position capture + obs = robot.capture_observation() + joint_positions = obs["observation.state"].cpu().numpy() + kinematics = RobotKinematics(robot.robot_type) + current_ee_pos = kinematics.fk_gripper_tip(joint_positions) + + # Initialize desired position with current position + desired_ee_pos = np.eye(4) # Identity matrix + + timestep = time.perf_counter() + with controller: + while not controller.should_quit() and time.perf_counter() - timestep < 60.0: + loop_start_time = time.perf_counter() + + # Process input events + controller.update() + + # Get current robot state + joint_positions = robot.follower_arms["main"].read("Present_Position") + current_ee_pos = kinematics.fk_gripper_tip(joint_positions) + + # Get movement deltas from the controller + delta_x, delta_y, delta_z = controller.get_deltas() + + # Update desired position + desired_ee_pos[0, 3] = current_ee_pos[0, 3] + delta_x + desired_ee_pos[1, 3] = current_ee_pos[1, 3] + delta_y + desired_ee_pos[2, 3] = current_ee_pos[2, 3] + delta_z + + # Apply bounds if provided + if bounds is not None: + desired_ee_pos[:3, 3] = np.clip(desired_ee_pos[:3, 3], bounds["min"], bounds["max"]) + + # Only send commands if there's actual movement + if any(abs(v) > 0.001 for v in [delta_x, delta_y, delta_z]): + # Compute joint targets via inverse kinematics + target_joint_state = kinematics.ik(joint_positions, desired_ee_pos, position_only=True) + + # Send command to robot + robot.send_action(torch.from_numpy(target_joint_state)) + + busy_wait(1 / fps - (time.perf_counter() - loop_start_time)) + + +def teleoperate_gym_env(env, controller, fps: int = 30): + """ + Control a robot through a gym environment using keyboard inputs. + + Args: + env: A gym environment created with make_robot_env + fps: Target control frequency + """ + + logging.info("Testing Keyboard Control of Gym Environment") + print("Keyboard controls:") + print(" Arrow keys: Move in X-Y plane") + print(" Shift and Shift_R: Move in Z axis") + print(" ESC: Exit") + + # Reset the environment to get initial observation + obs, info = env.reset() + + try: + with controller: + while not controller.should_quit(): + loop_start_time = time.perf_counter() + + # Process input events + controller.update() + + # Get movement deltas from the controller + delta_x, delta_y, delta_z = controller.get_deltas() + + # Create the action vector + action = np.array([delta_x, delta_y, delta_z]) + + # Skip if no movement + if any(abs(v) > 0.001 for v in [delta_x, delta_y, delta_z]): + # Step the environment - pass action as a tensor with intervention flag + action_tensor = torch.from_numpy(action.astype(np.float32)) + obs, reward, terminated, truncated, info = env.step((action_tensor, False)) + + # Log information + logging.info(f"Action: [{delta_x:.4f}, {delta_y:.4f}, {delta_z:.4f}]") + logging.info(f"Reward: {reward}") + + # Reset if episode ended + if terminated or truncated: + logging.info("Episode ended, resetting environment") + obs, info = env.reset() + + # Maintain target frame rate + busy_wait(1 / fps - (time.perf_counter() - loop_start_time)) + + finally: + # Close the environment + env.close() diff --git a/lerobot/common/teleoperators/gamepad/teleop_gamepad.py b/lerobot/common/teleoperators/gamepad/teleop_gamepad.py new file mode 100644 index 00000000..ace9926e --- /dev/null +++ b/lerobot/common/teleoperators/gamepad/teleop_gamepad.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from queue import Queue +from typing import Any +from enum import IntEnum + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +import numpy as np +from ..teleoperator import Teleoperator +from .configuration_gamepad import GamepadTeleopConfig + +class GripperAction(IntEnum): + CLOSE = 0 + STAY = 1 + OPEN = 2 + +gripper_action_map = { + "close": GripperAction.CLOSE.value, + "open": GripperAction.OPEN.value, + "stay": GripperAction.STAY.value, +} + + +class GamepadTeleop(Teleoperator): + """ + Teleop class to use gamepad inputs for control. + """ + + config_class = GamepadTeleopConfig + name = "gamepad" + + def __init__(self, config: GamepadTeleopConfig): + super().__init__(config) + self.config = config + self.robot_type = config.type + + self.event_queue = Queue() + self.current_pressed = {} + self.listener = None + self.logs = {} + + self.gamepad = None + + @property + def action_features(self) -> dict: + if self.config.use_gripper: + return { + "dtype": "float32", + "shape": (4,), + "names": {"delta_x": 0, "delta_y": 1, "delta_z": 2, "gripper": 3}, + } + else: + return { + "dtype": "float32", + "shape": (3,), + "names": {"delta_x": 0, "delta_y": 1, "delta_z": 2}, + } + + @property + def feedback_features(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + pass + + @property + def is_calibrated(self) -> bool: + return True + + def connect(self) -> None: + # use HidApi for macos + if sys.platform == "darwin": + # NOTE: On macOS, pygame doesn’t reliably detect input from some controllers so we fall back to hidapi + from lerobot.scripts.server.end_effector_control_utils import GamepadControllerHID as Gamepad + else: + from lerobot.scripts.server.end_effector_control_utils import GamepadController as Gamepad + + self.gamepad = Gamepad( + x_step_size=self.config.x_step_size, + y_step_size=self.config.y_step_size, + z_step_size=self.config.z_step_size, + ) + self.gamepad.start() + + def calibrate(self) -> None: + pass + + def configure(self): + pass + + def get_action(self) -> dict[str, Any]: + # Update the controller to get fresh inputs + self.gamepad.update() + + # Get movement deltas from the controller + delta_x, delta_y, delta_z = self.gamepad.get_deltas() + + # Create action from gamepad input + gamepad_action = np.array([delta_x, delta_y, delta_z], dtype=np.float32) + + action_dict = { + "delta_x": gamepad_action[0], + "delta_y": gamepad_action[1], + "delta_z": gamepad_action[2], + } + + gripper_action = None + if self.config.use_gripper: + gripper_command = self.gamepad.gripper_command() + gripper_action = gripper_action_map[gripper_command] + action_dict["gripper"] = gripper_action + + return action_dict + + def send_feedback(self, feedback: dict[str, Any]) -> None: + pass + + def disconnect(self) -> None: + pass diff --git a/lerobot/common/utils/utils.py b/lerobot/common/utils/utils.py index d9f7f78d..c4e5ece5 100644 --- a/lerobot/common/utils/utils.py +++ b/lerobot/common/utils/utils.py @@ -17,10 +17,10 @@ import logging import os import os.path as osp import platform -import subprocess -import time import select +import subprocess import sys +import time from copy import copy, deepcopy from datetime import datetime, timezone from pathlib import Path @@ -254,6 +254,7 @@ def move_cursor_up(lines): """Move the cursor up by a specified number of lines.""" print(f"\033[{lines}A", end="") + class TimerManager: """ Lightweight utility to measure elapsed time. diff --git a/lerobot/teleoperate.py b/lerobot/teleoperate.py index a8c6e4f6..3b7ae838 100644 --- a/lerobot/teleoperate.py +++ b/lerobot/teleoperate.py @@ -58,7 +58,7 @@ from lerobot.common.teleoperators import ( from lerobot.common.utils.utils import init_logging, move_cursor_up from lerobot.common.utils.visualization_utils import _init_rerun -from .common.teleoperators import koch_leader, so100_leader, gamepad # noqa: F401 +from .common.teleoperators import gamepad, koch_leader, so100_leader # noqa: F401 @dataclass