From 96fa48b5ec96ec489bc8444817842fa541a53ffd Mon Sep 17 00:00:00 2001 From: Simon Alibert <75076266+aliberts@users.noreply.github.com> Date: Fri, 13 Jun 2025 18:23:07 +0200 Subject: [PATCH] Robot integration tutorial (#1285) --- docs/source/_toctree.yml | 2 + docs/source/integrate_hardware.mdx | 321 +++++++++++++++++++ lerobot/common/robots/robot.py | 87 ++++- lerobot/common/teleoperators/teleoperator.py | 85 ++++- 4 files changed, 483 insertions(+), 12 deletions(-) create mode 100644 docs/source/integrate_hardware.mdx diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 8430368e..0e83a1fe 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -9,6 +9,8 @@ title: Getting Started with Real-World Robots - local: cameras title: Cameras + - local: integrate_hardware + title: Bring Your Own Hardware - local: hilserl title: Train a Robot with RL - local: hilserl_sim diff --git a/docs/source/integrate_hardware.mdx b/docs/source/integrate_hardware.mdx new file mode 100644 index 00000000..0c29121d --- /dev/null +++ b/docs/source/integrate_hardware.mdx @@ -0,0 +1,321 @@ +# Bring Your Own Hardware + +This tutorial will explain how to integrate your own robot design into the LeRobot ecosystem and have it access all of our tools (data collection, control pipelines, policy training and inference). + +To that end, we provide the [`Robot`](https://github.com/huggingface/lerobot/blob/main/lerobot/common/robots/robot.py) base class in the LeRobot which specifies a standard interface for physical robot integration. Let's see how to implement it. + +## Prerequisites + +- Your own robot which exposes a communication interface (e.g. serial, CAN, TCP) +- A way to read sensor data and send motor commands programmatically, e.g. manufacturer's SDK or API, or your own protocol implementation. +- LeRobot installed in your environment. Follow our [Installation Guide](./installation). + +## Choose your motors + +If you're using Feetech or Dynamixel motors, LeRobot provides built-in bus interfaces: + +- [`FeetechMotorsBus`](https://github.com/huggingface/lerobot/blob/main/lerobot/common/motors/feetech/feetech.py) – for controlling Feetech servos +- [`DynamixelMotorsBus`](https://github.com/huggingface/lerobot/blob/main/lerobot/common/motors/dynamixel/dynamixel.py) – for controlling Dynamixel servos + +Please refer to the [`MotorsBus`](https://github.com/huggingface/lerobot/blob/main/lerobot/common/motors/motors_bus.py) abstract class to learn about its API. +For a good example of how it can be used, you can have a look at our own [SO101 follower implementation](https://github.com/huggingface/lerobot/blob/main/lerobot/common/robots/so101_follower/so101_follower.py) + +Use these if compatible! Otherwise, you'll need to find or write a Python interface (not covered in this tutorial): +- Find an existing SDK in Python (or use bindings to C/C++) +- Or implement a basic communication wrapper (e.g., via pyserial, socket, or CANopen) + +You're not alone—many community contributions use custom boards or firmware! + +For Feetech and Dynamixel, we currently support these servos: + - Feetech: + - STS & SMS series (protocol 0): `sts3215`, `sts3250`, `sm8512bl` + - SCS series (protocol 1): `scs0009` + - Dynamixel (protocol 2.0 only): `xl330-m077`, `xl330-m288`, `xl430-w250`, `xm430-w350`, `xm540-w270`, `xc430-w150` + +If you are using Feetech or Dynamixel servos that are not in this list, you can add those in the [Feetech table](https://github.com/huggingface/lerobot/blob/main/lerobot/common/motors/feetech/tables.py) or [Dynamixel table](https://github.com/huggingface/lerobot/blob/main/lerobot/common/motors/dynamixel/tables.py). Depending on the model, this will require you to add model-specific information. In most cases though, there should be a lot of additions to do. + +In the next sections, we'll use a `FeetechMotorsBus` as the motors interface for the examples. Replace it and adapt to your motors if necessary. + +## Step 1: Subclass the `Robot` Interface + +You’ll first need to specify the config class and a string identifier (`name`) for your robot. If your robot has special needs that you'd like to be able to change easily, it should go here (e.g. port/address, baudrate). + +Here, we'll add the port name and one camera by default for our robot: +```python +from dataclasses import dataclass, field + +from lerobot.common.cameras import CameraConfig +from lerobot.common.cameras.opencv import OpenCVCameraConfig +from lerobot.common.robots import RobotConfig + + +@RobotConfig.register_subclass("my_cool_robot") +@dataclass +class MyCoolRobotConfig(RobotConfig): + port: str + cameras: dict[str, CameraConfig] = field( + default_factory={ + "cam_1": OpenCVCameraConfig( + index_or_path=2, + fps=30, + width=480, + height=640, + ), + } + ) +``` + +Have a look at our [Cameras tutorial](./cameras) to understand how to detect and add your camera. + +Next, we'll create our actual robot class which inherits from `Robot`. This abstract class defines a contract you must follow for your robot to be usable with the rest of the LeRobot tools. + +Here we'll create a simple 5-DoF robot with one camera. It could be a simple arm but notice that the `Robot` abstract class does not assume anything on your robot's form factor. You can let you imagination run wild when designing new robots! + +```python +from lerobot.common.cameras import make_cameras_from_configs +from lerobot.common.motors import Motor, MotorNormMode +from lerobot.common.motors.feetech import FeetechMotorsBus +from lerobot.common.robots import Robot + +class MyCoolRobot(Robot): + config_class = MyCoolRobotConfig + name = "my_cool_robot" + + def __init__(self, config: MyCoolRobotConfig): + super().__init__(config) + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + "joint_1": Motor(1, "sts3250", MotorNormMode.RANGE_M100_100), + "joint_2": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), + "joint_3": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), + "joint_4": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), + "joint_5": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + }, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) +``` + +## Step 2: Define Observation and Action Features + +These two properties define the *interface contract* between your robot and tools that consume it (such as data collection or learning pipelines). + +> [!WARNING] +> Note that these properties must be callable even if the robot is not yet connected, so avoid relying on runtime hardware state to define them. + +### `observation_features` + +This property should return a dictionary describing the structure of sensor outputs from your robot. The keys match what `get_observation()` returns, and the values describe either the shape (for arrays/images) or the type (for simple values). + +Example for our 5-DoF arm with one camera: +```python +@property +def _motors_ft(self) -> dict[str, type]: + return { + "joint_1.pos": float, + "joint_2.pos": float, + "joint_3.pos": float, + "joint_4.pos": float, + "joint_5.pos": float, + } + +@property +def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.cameras[cam].height, self.cameras[cam].width, 3) for cam in self.cameras + } + +@property +def observation_features(self) -> dict: + return {**self._motors_ft, **self._cameras_ft} +``` +In this case, observations consist of a simple dict storing each motor's position and a camera image. + +### `action_features` + +This property describes the commands your robot expects via `send_action()`. Again, keys must match the expected input format, and values define the shape/type of each command. + +Here, we simply use the same joints proprioceptive features (`self._motors_ft`) as with `observation_features`: the action sent will simply the goal position for each motor. +```python +def action_features(self) -> dict: + return self._motors_ft +``` + +## Step 3: Handle Connection and Disconnection + +These methods should handle opening and closing communication with your hardware (e.g. serial ports, CAN interfaces, USB devices, cameras). + +### `is_connected` + +This property should simply reflect that communication with the robot's hardware is established. When this property is `True`, it should be possible to read and write to the hardware using `get_observation()` and `send_action()`. + +```python +@property +def is_connected(self) -> bool: + return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) +``` + +### `connect()` + +This method should establish communication with the hardware. Moreover, if your robot needs calibration is not calibrated, it should start a calibration procedure by default. If your robot needs some specific configuration, this should also be called here. + +```python +def connect(self, calibrate: bool = True) -> None: + self.bus.connect() + if not self.is_calibrated and calibrate: + self.calibrate() + + for cam in self.cameras.values(): + cam.connect() + + self.configure() +``` + +### `disconnect()` + +This method should gracefully terminate communication with the hardware: free any related resources (threads or processes), close ports, etc. + +Here, we already handle this in our `MotorsBus` and `Camera` classes so we just need to call their own `disconnect()` methods: +```python +def disconnect(self) -> None: + self.bus.disconnect() + for cam in self.cameras.values(): + cam.disconnect() +``` + +## Step 4: Support Calibration and Configuration + +LeRobot supports saving and loading calibration data automatically. This is useful for joint offsets, zero positions, or sensor alignment. + +> Note that depending on your hardware, this may not apply. If that's the case, you can simply leave these methods as no-ops: +> ```python +> @property +> def is_calibrated(self) -> bool: +> return True +> +> def calibrate(self) -> None: +> pass +> ``` + +### `is_calibrated` + +This should reflect whether your robot has the required calibration loaded. + +```python +@property +def is_calibrated(self) -> bool: + return self.bus.is_calibrated +``` + +### `calibrate()` + +The goal of the calibration is twofold: + - Know the physical range of motion of each motors in order to only send commands within this range. + - Normalize raw motors positions to sensible continuous values (e.g. percentages, degrees) instead of arbitrary discrete value dependant on the specific motor used that will not replicate elsewhere. + +It should implement the logic for calibration (if relevant) and update the `self.calibration` dictionary. If you are using Feetech or Dynamixel motors, our bus interfaces already include methods to help with this. + +```python +def calibrate(self) -> None: + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.bus.set_half_turn_homings() + + print( + "Move all joints sequentially through their entire ranges " + "of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion() + + self.calibration = {} + for motor, m in self.bus.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.bus.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) +``` + +### `configure()` + +Use this to set up any configuration for your hardware (servos control modes, controller gains, etc.). This should usually be run at connection time and be idempotent. + +```python +def configure(self) -> None: + with self.bus.torque_disabled(): + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.write("P_Coefficient", motor, 16) + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 32) +``` + +## Step 5: Implement Sensors Reading and Action Sending + +These are the most important runtime functions: the core I/O loop. + +### `get_observation()` + +Returns a dictionary of sensor values from the robot. These typically include motor states, camera frames, various sensors, etc. In the LeRobot framework, these observations are what will be fed to a policy in order to predict the actions to take. The dictionary keys and structure must match `observation_features`. + +```python +def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise RuntimeError("Robot is not connected") + + joint_pos = self.motor_interface.read_joint_positions() + gripper = self.motor_interface.read_gripper_state() + image = self.camera.get_frame() + + return { + "joint_positions": joint_pos, + "gripper_open": gripper, + "camera_image": image, + } +``` + +### `send_action()` + +Takes a dictionary that matches `action_features`, and sends it to your hardware. You can add safety limits (clipping, smoothing) and return what was actually sent. + +```python +def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + if not self.is_connected: + raise RuntimeError("Robot is not connected") + + self.motor_interface.set_joint_positions(action["joint_position_goals"]) + self.motor_interface.set_gripper(action["gripper_command"]) + + return action +``` + + + +## Adding a Teleoperator + +For implementing teleoperation devices, we also provide a [`Teleoperator`](https://github.com/huggingface/lerobot/blob/main/lerobot/common/teleoperators/teleoperator.py) base class. This class is very similar to the `Robot` base class and also doesn't assume anything on form factor. + +The main differences are in the I/O functions: a teleoperator allows you to produce action via `get_action` and can receive feedback actions via `send_feedback`. Feedback could be anything controllable on the teleoperation device that could help the person controlling it understand the consequences of the actions sent. Think motion/force feedback on a leader arm, vibrations on a gamepad controller for example. To implement a teleoperator, you can follow this same tutorial and adapt it for these two methods. + +## Wrapping Up + +Once your robot class is complete, you can leverage the LeRobot ecosystem: + +- Control your robot with available teleoperators or integrate directly your teleoperating device +- Record training data and visualize it +- Integrate it into RL or imitation learning pipelines + +Don't hesitate to reach out to the community for help on our [Discord](https://discord.gg/s3KuuzsPFb) 🤗 diff --git a/lerobot/common/robots/robot.py b/lerobot/common/robots/robot.py index e5af9e79..ec2b155f 100644 --- a/lerobot/common/robots/robot.py +++ b/lerobot/common/robots/robot.py @@ -27,7 +27,16 @@ from .config import RobotConfig # TODO(aliberts): action/obs typing such as Generic[ObsType, ActType] similar to gym.Env ? # https://github.com/Farama-Foundation/Gymnasium/blob/3287c869f9a48d99454306b0d4b4ec537f0f35e3/gymnasium/core.py#L23 class Robot(abc.ABC): - """The main LeRobot class for implementing robots.""" + """ + The base abstract class for all LeRobot-compatible robots. + + This class provides a standardized interface for interacting with physical robots. + Subclasses must implement all abstract methods and properties to be usable. + + Attributes: + config_class (RobotConfig): The expected configuration class for this robot. + name (str): The unique robot name used to identify this robot type. + """ # Set these in ALL subclasses config_class: RobotConfig @@ -52,58 +61,124 @@ class Robot(abc.ABC): @property @abc.abstractmethod def observation_features(self) -> dict: + """ + A dictionary describing the structure and types of the observations produced by the robot. + Its structure (keys) should match the structure of what is returned by :pymeth:`get_observation`. + Values for the dict should either be: + - The type of the value if it's a simple value, e.g. `float` for single proprioceptive value (a joint's position/velocity) + - A tuple representing the shape if it's an array-type value, e.g. `(height, width, channel)` for images + + Note: this property should be able to be called regardless of whether the robot is connected or not. + """ pass @property @abc.abstractmethod def action_features(self) -> dict: + """ + A dictionary describing the structure and types of the actions expected by the robot. Its structure + (keys) should match the structure of what is passed to :pymeth:`send_action`. Values for the dict + should be the type of the value if it's a simple value, e.g. `float` for single proprioceptive value + (a joint's goal position/velocity) + + Note: this property should be able to be called regardless of whether the robot is connected or not. + """ pass @property @abc.abstractmethod def is_connected(self) -> bool: + """ + Whether the robot is currently connected or not. If `False`, calling :pymeth:`get_observation` or + :pymeth:`send_action` should raise an error. + """ pass @abc.abstractmethod def connect(self, calibrate: bool = True) -> None: - """Connects to the robot.""" + """ + Establish communication with the robot. + + Args: + calibrate (bool): If True, automatically calibrate the robot after connecting if it's not + calibrated or needs calibration (this is hardware-dependant). + """ pass @property @abc.abstractmethod def is_calibrated(self) -> bool: + """Whether the robot is currently calibrated or not. Should be always `True` if not applicable""" pass @abc.abstractmethod def calibrate(self) -> None: - """Calibrates the robot.""" + """ + Calibrate the robot if applicable. If not, this should be a no-op. + + This method should collect any necessary data (e.g., motor offsets) and update the + :pyattr:`calibration` dictionary accordingly. + """ pass def _load_calibration(self, fpath: Path | None = None) -> None: + """ + Helper to load calibration data from the specified file. + + Args: + fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`. + """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath) as f, draccus.config_type("json"): self.calibration = draccus.load(dict[str, MotorCalibration], f) def _save_calibration(self, fpath: Path | None = None) -> None: + """ + Helper to save calibration data to the specified file. + + Args: + fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`. + """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath, "w") as f, draccus.config_type("json"): draccus.dump(self.calibration, f, indent=4) @abc.abstractmethod def configure(self) -> None: + """ + Apply any one-time or runtime configuration to the robot. + This may include setting motor parameters, control modes, or initial state. + """ pass @abc.abstractmethod def get_observation(self) -> dict[str, Any]: - """Gets observation from the robot.""" + """ + Retrieve the current observation from the robot. + + Returns: + dict[str, Any]: A flat dictionary representing the robot's current sensory state. Its structure + should match :pymeth:`observation_features`. + """ + pass @abc.abstractmethod def send_action(self, action: dict[str, Any]) -> dict[str, Any]: - """Sends actions to the robot.""" + """ + Send an action command to the robot. + + Args: + action (dict[str, Any]): Dictionary representing the desired action. Its structure should match + :pymeth:`action_features`. + + Returns: + dict[str, Any]: The action actually sent to the motors potentially clipped or modified, e.g. by + safety limits on velocity. + """ pass @abc.abstractmethod def disconnect(self) -> None: - """Disconnects from the robot.""" + """Disconnect from the robot and perform any necessary cleanup.""" pass diff --git a/lerobot/common/teleoperators/teleoperator.py b/lerobot/common/teleoperators/teleoperator.py index d8715a55..1fef8132 100644 --- a/lerobot/common/teleoperators/teleoperator.py +++ b/lerobot/common/teleoperators/teleoperator.py @@ -25,7 +25,16 @@ from .config import TeleoperatorConfig class Teleoperator(abc.ABC): - """The main LeRobot class for implementing teleoperation devices.""" + """ + The base abstract class for all LeRobot-compatible teleoperation devices. + + This class provides a standardized interface for interacting with physical teleoperators. + Subclasses must implement all abstract methods and properties to be usable. + + Attributes: + config_class (RobotConfig): The expected configuration class for this teleoperator. + name (str): The unique name used to identify this teleoperator type. + """ # Set these in ALL subclasses config_class: TeleoperatorConfig @@ -50,58 +59,122 @@ class Teleoperator(abc.ABC): @property @abc.abstractmethod def action_features(self) -> dict: + """ + A dictionary describing the structure and types of the actions produced by the teleoperator. Its + structure (keys) should match the structure of what is returned by :pymeth:`get_action`. Values for + the dict should be the type of the value if it's a simple value, e.g. `float` for single + proprioceptive value (a joint's goal position/velocity) + + Note: this property should be able to be called regardless of whether the robot is connected or not. + """ pass @property @abc.abstractmethod def feedback_features(self) -> dict: + """ + A dictionary describing the structure and types of the feedback actions expected by the robot. Its + structure (keys) should match the structure of what is passed to :pymeth:`send_feedback`. Values for + the dict should be the type of the value if it's a simple value, e.g. `float` for single + proprioceptive value (a joint's goal position/velocity) + + Note: this property should be able to be called regardless of whether the robot is connected or not. + """ pass @property @abc.abstractmethod def is_connected(self) -> bool: + """ + Whether the teleoperator is currently connected or not. If `False`, calling :pymeth:`get_action` + or :pymeth:`send_feedback` should raise an error. + """ pass @abc.abstractmethod def connect(self, calibrate: bool = True) -> None: - """Connects to the teleoperator.""" + """ + Establish communication with the teleoperator. + + Args: + calibrate (bool): If True, automatically calibrate the teleoperator after connecting if it's not + calibrated or needs calibration (this is hardware-dependant). + """ pass @property @abc.abstractmethod def is_calibrated(self) -> bool: + """Whether the teleoperator is currently calibrated or not. Should be always `True` if not applicable""" pass @abc.abstractmethod def calibrate(self) -> None: - """Calibrates the teleoperator.""" + """ + Calibrate the teleoperator if applicable. If not, this should be a no-op. + + This method should collect any necessary data (e.g., motor offsets) and update the + :pyattr:`calibration` dictionary accordingly. + """ pass def _load_calibration(self, fpath: Path | None = None) -> None: + """ + Helper to load calibration data from the specified file. + + Args: + fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`. + """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath) as f, draccus.config_type("json"): self.calibration = draccus.load(dict[str, MotorCalibration], f) def _save_calibration(self, fpath: Path | None = None) -> None: + """ + Helper to save calibration data to the specified file. + + Args: + fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`. + """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath, "w") as f, draccus.config_type("json"): draccus.dump(self.calibration, f, indent=4) @abc.abstractmethod def configure(self) -> None: + """ + Apply any one-time or runtime configuration to the teleoperator. + This may include setting motor parameters, control modes, or initial state. + """ pass @abc.abstractmethod def get_action(self) -> dict[str, Any]: - """Gets the action to send to a teleoperator.""" + """ + Retrieve the current action from the teleoperator. + + Returns: + dict[str, Any]: A flat dictionary representing the teleoperator's current actions. Its + structure should match :pymeth:`observation_features`. + """ pass @abc.abstractmethod def send_feedback(self, feedback: dict[str, Any]) -> None: - """Sends feedback captured from a robot to the teleoperator.""" + """ + Send a feedback action command to the teleoperator. + + Args: + action (dict[str, Any]): Dictionary representing the desired action. Its structure should match + :pymeth:`action_features`. + + Returns: + dict[str, Any]: The action actually sent to the motors potentially clipped or modified, e.g. by + safety limits on velocity. + """ pass @abc.abstractmethod def disconnect(self) -> None: - """Disconnects from the teleoperator.""" + """Disconnect from the teleoperator and perform any necessary cleanup.""" pass