From d322cb0220a1cffcd8577e2bb0af9145a84d0be4 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Sun, 11 May 2025 13:15:28 +0200 Subject: [PATCH] Add SO101 --- .../common/robots/so101_follower/README.md | 0 .../common/robots/so101_follower/__init__.py | 2 + .../so101_follower/config_so101_follower.py | 38 ++++ .../robots/so101_follower/so101_follower.py | 211 ++++++++++++++++++ lerobot/common/robots/utils.py | 4 + .../teleoperators/so101_leader/__init__.py | 2 + .../so101_leader/config_so101_leader.py | 26 +++ .../so101_leader/so101_leader.py | 142 ++++++++++++ lerobot/common/teleoperators/utils.py | 4 + 9 files changed, 429 insertions(+) rename examples/12_use_so101.md => lerobot/common/robots/so101_follower/README.md (100%) create mode 100644 lerobot/common/robots/so101_follower/__init__.py create mode 100644 lerobot/common/robots/so101_follower/config_so101_follower.py create mode 100644 lerobot/common/robots/so101_follower/so101_follower.py create mode 100644 lerobot/common/teleoperators/so101_leader/__init__.py create mode 100644 lerobot/common/teleoperators/so101_leader/config_so101_leader.py create mode 100644 lerobot/common/teleoperators/so101_leader/so101_leader.py diff --git a/examples/12_use_so101.md b/lerobot/common/robots/so101_follower/README.md similarity index 100% rename from examples/12_use_so101.md rename to lerobot/common/robots/so101_follower/README.md diff --git a/lerobot/common/robots/so101_follower/__init__.py b/lerobot/common/robots/so101_follower/__init__.py new file mode 100644 index 000000000..f6615b15b --- /dev/null +++ b/lerobot/common/robots/so101_follower/__init__.py @@ -0,0 +1,2 @@ +from .config_so101_follower import SO101FollowerConfig +from .so101_follower import SO101Follower diff --git a/lerobot/common/robots/so101_follower/config_so101_follower.py b/lerobot/common/robots/so101_follower/config_so101_follower.py new file mode 100644 index 000000000..c75f003bb --- /dev/null +++ b/lerobot/common/robots/so101_follower/config_so101_follower.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field + +from lerobot.common.cameras import CameraConfig + +from ..config import RobotConfig + + +@RobotConfig.register_subclass("so101_follower") +@dataclass +class SO101FollowerConfig(RobotConfig): + # Port to connect to the arm + port: str + + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None + + # cameras + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/lerobot/common/robots/so101_follower/so101_follower.py b/lerobot/common/robots/so101_follower/so101_follower.py new file mode 100644 index 000000000..6d555005c --- /dev/null +++ b/lerobot/common/robots/so101_follower/so101_follower.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..robot import Robot +from ..utils import ensure_safe_goal_position +from .config_so101_follower import SO101FollowerConfig + +logger = logging.getLogger(__name__) + + +class SO101Follower(Robot): + """ + SO-101 Follower Arm designed by TheRobotStudio and Hugging Face. + """ + + config_class = SO101FollowerConfig + name = "so101_follower" + + def __init__(self, config: SO101FollowerConfig): + super().__init__(config) + self.config = config + self.arm = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), + "shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) + + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.arm.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.arm.is_connected + + def connect(self, calibrate: bool = True) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.arm.connect() + if not self.is_calibrated and calibrate: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for motor in self.arm.motors: + self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + print( + "Move all joints sequentially through their entire ranges " + "of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion() + + self.calibration = {} + for motor, m in self.arm.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.arm.torque_disabled(): + self.arm.configure_motors() + for motor in self.arm.motors: + self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.arm.write("P_Coefficient", motor, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.arm.write("I_Coefficient", motor, 0) + self.arm.write("D_Coefficient", motor, 32) + + def setup_motors(self) -> None: + for motor in reversed(self.arm.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.arm.setup_motor(motor) + print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Read arm position + start = time.perf_counter() + obs_dict = self.arm.sync_read("Present_Position") + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + """Command arm to move to a target joint configuration. + + The relative action magnitude may be clipped depending on the configuration parameter + `max_relative_target`. In this case, the action sent differs from original action. + Thus, this function always returns the action actually sent. + + Raises: + RobotDeviceNotConnectedError: if robot is not connected. + + Returns: + the action sent to the motors, potentially clipped. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.arm.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + + # Send goal position to the arm + self.arm.sync_write("Goal_Position", goal_pos) + return {f"{motor}.pos": val for motor, val in goal_pos.items()} + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.arm.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/utils.py b/lerobot/common/robots/utils.py index 11200d182..1d802776b 100644 --- a/lerobot/common/robots/utils.py +++ b/lerobot/common/robots/utils.py @@ -45,6 +45,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot: from .so100_follower import SO100Follower return SO100Follower(config) + elif config.type == "so101_follower": + from .so101_follower import SO101Follower + + return SO101Follower(config) elif config.type == "lekiwi": from .lekiwi import LeKiwiClient diff --git a/lerobot/common/teleoperators/so101_leader/__init__.py b/lerobot/common/teleoperators/so101_leader/__init__.py new file mode 100644 index 000000000..1f45170e9 --- /dev/null +++ b/lerobot/common/teleoperators/so101_leader/__init__.py @@ -0,0 +1,2 @@ +from .config_so101_leader import SO101LeaderConfig +from .so101_leader import SO101Leader diff --git a/lerobot/common/teleoperators/so101_leader/config_so101_leader.py b/lerobot/common/teleoperators/so101_leader/config_so101_leader.py new file mode 100644 index 000000000..5f2e110da --- /dev/null +++ b/lerobot/common/teleoperators/so101_leader/config_so101_leader.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("so101_leader") +@dataclass +class SO101LeaderConfig(TeleoperatorConfig): + # Port to connect to the arm + port: str diff --git a/lerobot/common/teleoperators/so101_leader/so101_leader.py b/lerobot/common/teleoperators/so101_leader/so101_leader.py new file mode 100644 index 000000000..00c82304a --- /dev/null +++ b/lerobot/common/teleoperators/so101_leader/so101_leader.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..teleoperator import Teleoperator +from .config_so101_leader import SO101LeaderConfig + +logger = logging.getLogger(__name__) + + +class SO101Leader(Teleoperator): + """ + SO-101 Leader Arm designed by TheRobotStudio and Hugging Face. + """ + + config_class = SO101LeaderConfig + name = "so101_leader" + + def __init__(self, config: SO101LeaderConfig): + super().__init__(config) + self.config = config + self.arm = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), + "shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + ) + + @property + def action_features(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.arm.motors} + + @property + def feedback_features(self) -> dict[str, type]: + return {} + + @property + def is_connected(self) -> bool: + return self.arm.is_connected + + def connect(self, calibrate: bool = True) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.arm.connect() + if not self.is_calibrated and calibrate: + self.calibrate() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for motor in self.arm.motors: + self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + print( + "Move all joints sequentially through their entire ranges " + "of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion() + + self.calibration = {} + for motor, m in self.arm.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + logger.info(f"Calibration saved to {self.calibration_fpath}") + + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() + for motor in self.arm.motors: + self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + def setup_motors(self) -> None: + for motor in reversed(self.arm.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.arm.setup_motor(motor) + print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + + def get_action(self) -> dict[str, float]: + start = time.perf_counter() + action = self.arm.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") + return action + + def send_feedback(self, feedback: dict[str, float]) -> None: + # TODO(rcadene, aliberts): Implement force feedback + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.arm.disconnect() + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/utils.py b/lerobot/common/teleoperators/utils.py index ca355b516..f9fbbea29 100644 --- a/lerobot/common/teleoperators/utils.py +++ b/lerobot/common/teleoperators/utils.py @@ -15,6 +15,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: from .so100_leader import SO100Leader return SO100Leader(config) + elif config.type == "so101_leader": + from .so101_leader import SO101Leader + + return SO101Leader(config) elif config.type == "stretch3": from .stretch3_gamepad import Stretch3GamePad