Update robot features & naming

This commit is contained in:
Simon Alibert
2025-05-08 12:43:47 +02:00
parent 69dc3f5c9c
commit 87a8cb6d89
20 changed files with 639 additions and 341 deletions

View File

@@ -1,4 +1,3 @@
from .config import RobotConfig
from .robot import Robot
__all__ = ["RobotConfig", "Robot"]
from .utils import make_robot_from_config

View File

@@ -0,0 +1,330 @@
# Using the [Koch v1.1](https://github.com/jess-moss/koch-v1-1) with LeRobot
## Table of Contents
- [A. Order and Assemble the parts](#a-order-and-assemble-the-parts)
- [B. Install LeRobot](#b-install-lerobot)
- [C. Configure the Motors](#c-configure-the-motors)
- [D. Calibrate](#d-calibrate)
- [E. Teleoperate](#e-teleoperate)
- [F. Record a dataset](#f-record-a-dataset)
- [G. Visualize a dataset](#g-visualize-a-dataset)
- [H. Replay an episode](#h-replay-an-episode)
- [I. Train a policy](#i-train-a-policy)
- [J. Evaluate your policy](#j-evaluate-your-policy)
- [K. More Information](#k-more-information)
## A. Order and Assemble the parts
Follow the sourcing and assembling instructions provided on the [Koch v1.1 Github page](https://github.com/jess-moss/koch-v1-1). This will guide you through setting up both the follower and leader arms, as shown in the image below.
<div style="text-align:center;">
<img src="../media/tutorial/koch_v1_1_leader_follower.webp?raw=true" alt="Koch v1.1 leader and follower arms" title="Koch v1.1 leader and follower arms" width="50%">
</div>
For a visual walkthrough of the assembly process, you can refer to [this video tutorial](https://youtu.be/8nQIg9BwwTk).
> [!IMPORTANT]
> Since the production of this video, we simplified the configuration phase (detailed in [section C](#c-configure-the-motors)) of the motors.
> Because of this, two things differ from the instructions in that video:
> - Don't plug all the motors cables right away and wait for being instructed to do so in [section C](#c-configure-the-motors).
> - Don't screw in the controller board (PCB) to the base right away and wait for being instructed to do so in [section C](#c-configure-the-motors).
## B. Install LeRobot
> [!TIP]
> We use the Command Prompt (cmd) quite a lot. If you are not comfortable using the cmd or want to brush up using the command line you can have a look here: [Command line crash course](https://developer.mozilla.org/en-US/docs/Learn_web_development/Getting_started/Environment_setup/Command_line)
Follow instructions on our [README](https://github.com/huggingface/lerobot) to install LeRobot.
In addition to these instructions, you need to install the dynamixel sdk:
```bash
pip install -e ".[dynamixel]"
```
## C. Configure the motors
### 1. Find the USB ports associated to each arm
For each controller board (Waveshare Serial Bus Servo Driver Board, one for the leader arm and one for the follower), connect it first to your computer through usb. To then find the internal port its connected to -which we will need later on- run the utility script:
```bash
python -m lerobot.find_port
```
> [!NOTE]
> Note: On Linux, you might need to give access to the USB ports by running:
> ```bash
> sudo chmod 666 /dev/ttyACM0
> sudo chmod 666 /dev/ttyACM1
> ```
This will first display all currently available ports on your computer. As prompted by the script, unplug the controller board usb cable from your computer. The script will then detect which port has been disconnected and will display it.
Example output when identifying the leader arm's port (e.g., `/dev/tty.usbmodem575E0031751` on Mac, or possibly `/dev/ttyACM0` on Linux):
```
Finding all available ports for the MotorBus.
['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
Remove the usb cable from your MotorsBus and press Enter when done.
[...Disconnect leader arm and press Enter...]
The port of this MotorsBus is /dev/tty.usbmodem575E0031751
Reconnect the usb cable.
```
You can now reconnect the usb cable to your computer.
### 2. Set the motors ids and baudrate
Each motor is identified by a unique id on the bus. When brand new, motors usually come with a default id of `1`. For the communication to work properly between the motors and the controller, we first need to set a unique, different id to each motor. Additionally, the speed at which data is transmitted on the bus is determined by the baudrate. In order to talk to each other, the controller and all the motors need to be configured with the same baudrate.
To that end, we first need to connect to each motor individually with the controller in order to set these. Since we will write these parameters in the non-volatile section of the motors' internal memory (EEPROM), we'll only need to do this once.
> [!NOTE]
> Note: If you are repurposing motors from another robot, you will probably also need to perform this step as the ids and baudrate likely won't match.
Connect the usb cable from your computer and the 5V power supply to the leader arm's controller board. Then, run the following command with the port you got from the previous step. You'll also need to give your leader arm a name with the `id` parameter.
```bash
python -m lerobot.setup_motors \
--device.type=so100_leader \
--device.port=/dev/tty.usbmodem575E0031751 \ # <- paste here the port found at previous step
--device.id=my_awesome_leader_arm # <- give it a nice, unique name
```
Note that the command above is equivalent to running the following script:
<details>
<summary>Setup script</summary>
```python
from lerobot.common.teleoperators.koch import KochLeader, KochLeaderConfig
config = KochLeaderConfig(
port="/dev/tty.usbmodem575E0031751",
id="my_awesome_leader_arm",
)
leader = KochLeader(config)
leader.setup_motors()
```
</details>
You should see the following instruction
```
Connect the controller board to the 'gripper' motor only and press enter.
```
As instructed, plug the gripper's motor. Make sure it's the only motor connected to the board, and that the motor itself is not yet daisy chained to any other motor. As you press `[Enter]`, the script will automatically set the id and baudrate for that motor.
<details>
<summary>Troubleshooting</summary>
If you get an error at that point, check your cables and make sure they are plugged-in properly:
- Power supply
- USB cable between from your computer to the controller board
- The 3-pin cable from the controller board to the motor.
If you are using a Waveshare controller board, make sure that the two jumpers are set on the `B` channel (USB).
</details>
You should then see the following message:
```
'gripper' motor id set to 6
```
Followed by the next instruction:
```
Connect the controller board to the 'wrist_roll' motor only and press enter.
```
You can disconnect the 3-pin cable from the controller board but you can leave it connected to the gripper motor on the other end as it will already be in the right place. Now, plug-in another 3-pin cable to the wrist roll motor and connect it to the controller board. As with the previous motor, make sure it is the only motor connected to the board and that the motor itself isn't connected to any other one.
Repeat the operation for each motor as instructed.
> [!TIP]
> Check your cabling at each step before pressing Enter. For instance, the power supply cable is not solidly anchored to the board and might disconnect easily as you manipulate the board.
When you are done, the script will simply finish, at which point the motors are ready to be used. You can now plug the 3-pin cable from each motor to the next one, and the cable from the first motor (the 'shoulder pan' with id=1) to the controller board, which can now be attached to the base of the arm.
## D. Calibrate
Next, you'll need to calibrate your SO-100 robot to ensure that the leader and follower arms have the same position values when they are in the same physical position. This calibration is essential because it allows a neural network trained on one SO-100 robot to work on another.
#### a. Manual calibration of follower arm
> [!IMPORTANT]
> Contrarily to step 6 of the [assembly video](https://youtu.be/FioA2oeFZ5I?t=724) which illustrates the auto calibration, we will actually do manual calibration of follower for now.
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| <img src="../media/so100/follower_zero.webp?raw=true" alt="SO-100 follower arm zero position" title="SO-100 follower arm zero position" style="width:100%;"> | <img src="../media/so100/follower_rotated.webp?raw=true" alt="SO-100 follower arm rotated position" title="SO-100 follower arm rotated position" style="width:100%;"> | <img src="../media/so100/follower_rest.webp?raw=true" alt="SO-100 follower arm rest position" title="SO-100 follower arm rest position" style="width:100%;"> |
Make sure both arms are connected and run this script to launch manual calibration:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--robot.cameras='{}' \
--control.type=calibrate \
--control.arms='["main_follower"]'
```
#### b. Manual calibration of leader arm
Follow step 6 of the [assembly video](https://youtu.be/FioA2oeFZ5I?t=724) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| <img src="../media/so100/leader_zero.webp?raw=true" alt="SO-100 leader arm zero position" title="SO-100 leader arm zero position" style="width:100%;"> | <img src="../media/so100/leader_rotated.webp?raw=true" alt="SO-100 leader arm rotated position" title="SO-100 leader arm rotated position" style="width:100%;"> | <img src="../media/so100/leader_rest.webp?raw=true" alt="SO-100 leader arm rest position" title="SO-100 leader arm rest position" style="width:100%;"> |
Run this script to launch manual calibration:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--robot.cameras='{}' \
--control.type=calibrate \
--control.arms='["main_leader"]'
```
## E. Teleoperate
**Simple teleop**
Then you are ready to teleoperate your robot! Run this simple script (it won't connect and display the cameras):
```bash
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--robot.cameras='{}' \
--control.type=teleoperate
```
#### a. Teleop with displaying cameras
Follow [this guide to setup your cameras](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#c-add-your-cameras-with-opencvcamera). Then you will be able to display the cameras on your computer while you are teleoperating by running the following code. This is useful to prepare your setup before recording your first dataset.
> **NOTE:** To visualize the data, enable `--control.display_data=true`. This streams the data using `rerun`.
```bash
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--control.type=teleoperate
```
## F. Record a dataset
Once you're familiar with teleoperation, you can record your first dataset with SO-100.
If you want to use the Hugging Face hub features for uploading your dataset and you haven't previously done it, make sure you've logged in using a write-access token, which can be generated from the [Hugging Face settings](https://huggingface.co/settings/tokens):
```bash
huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
```
Store your Hugging Face repository name in a variable to run these commands:
```bash
HF_USER=$(huggingface-cli whoami | head -n 1)
echo $HF_USER
```
Record 2 episodes and upload your dataset to the hub:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--control.type=record \
--control.fps=30 \
--control.single_task="Grasp a lego block and put it in the bin." \
--control.repo_id=${HF_USER}/so100_test \
--control.tags='["so100","tutorial"]' \
--control.warmup_time_s=5 \
--control.episode_time_s=30 \
--control.reset_time_s=30 \
--control.num_episodes=2 \
--control.push_to_hub=true
```
Note: You can resume recording by adding `--control.resume=true`.
## G. Visualize a dataset
If you uploaded your dataset to the hub with `--control.push_to_hub=true`, you can [visualize your dataset online](https://huggingface.co/spaces/lerobot/visualize_dataset) by copy pasting your repo id given by:
```bash
echo ${HF_USER}/so100_test
```
If you didn't upload with `--control.push_to_hub=false`, you can also visualize it locally with (a window can be opened in the browser `http://127.0.0.1:9090` with the visualization tool):
```bash
python lerobot/scripts/visualize_dataset_html.py \
--repo-id ${HF_USER}/so100_test \
--local-files-only 1
```
## H. Replay an episode
Now try to replay the first episode on your robot:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--control.type=replay \
--control.fps=30 \
--control.repo_id=${HF_USER}/so100_test \
--control.episode=0
```
## I. Train a policy
To train a policy to control your robot, use the [`python lerobot/scripts/train.py`](../lerobot/scripts/train.py) script. A few arguments are required. Here is an example command:
```bash
python lerobot/scripts/train.py \
--dataset.repo_id=${HF_USER}/so100_test \
--policy.type=act \
--output_dir=outputs/train/act_so100_test \
--job_name=act_so100_test \
--policy.device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/so100_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_so100_test/checkpoints`.
To resume training from a checkpoint, below is an example command to resume from `last` checkpoint of the `act_so100_test` policy:
```bash
python lerobot/scripts/train.py \
--config_path=outputs/train/act_so100_test/checkpoints/last/pretrained_model/train_config.json \
--resume=true
```
## J. Evaluate your policy
You can use the `record` function from [`lerobot/scripts/control_robot.py`](../lerobot/scripts/control_robot.py) but with a policy checkpoint as input. For instance, run this command to record 10 evaluation episodes:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--control.type=record \
--control.fps=30 \
--control.single_task="Grasp a lego block and put it in the bin." \
--control.repo_id=${HF_USER}/eval_act_so100_test \
--control.tags='["tutorial"]' \
--control.warmup_time_s=5 \
--control.episode_time_s=30 \
--control.reset_time_s=30 \
--control.num_episodes=10 \
--control.push_to_hub=true \
--control.policy.path=outputs/train/act_so100_test/checkpoints/last/pretrained_model
```
As you can see, it's almost the same command as previously used to record your training dataset. Two things changed:
1. There is an additional `--control.policy.path` argument which indicates the path to your policy checkpoint with (e.g. `outputs/train/eval_act_so100_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `${HF_USER}/act_so100_test`).
2. The name of dataset begins by `eval` to reflect that you are running inference (e.g. `${HF_USER}/eval_act_so100_test`).
## K. More Information
Follow this [previous tutorial](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#4-train-a-policy-on-your-data) for a more in-depth tutorial on controlling real robots with LeRobot.
> [!TIP]
> If you have any questions or need help, please reach out on [Discord](https://discord.com/invite/s3KuuzsPFb) in the channel [`#so100-arm`](https://discord.com/channels/1216765309076115607/1237741463832363039).

View File

@@ -16,10 +16,11 @@
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.constants import OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.dynamixel import (
@@ -62,27 +63,22 @@ class KochFollower(Robot):
self.cameras = make_cameras_from_configs(config.cameras)
@property
def state_feature(self) -> dict:
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@property
def action_feature(self) -> dict:
return self.state_feature
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
cam_ft[cam_key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
@@ -184,13 +180,14 @@ class KochFollower(Robot):
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -212,7 +209,7 @@ class KochFollower(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = action
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
@@ -223,7 +220,7 @@ class KochFollower(Robot):
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
return goal_pos
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:

View File

@@ -1,4 +0,0 @@
from .configuration_moss import MossRobotConfig
from .robot_moss import MossRobot
__all__ = ["MossRobotConfig", "MossRobot"]

View File

@@ -1,223 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import time
import numpy as np
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import TorqueMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
apply_feetech_offsets_from_calibration,
run_full_arm_calibration,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_moss import MossRobotConfig
class MossRobot(Robot):
"""
[Moss Arm](https://github.com/jess-moss/moss-robot-arms) designed by Jess Moss
"""
config_class = MossRobotConfig
name = "moss"
def __init__(self, config: MossRobotConfig):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": config.shoulder_pan,
"shoulder_lift": config.shoulder_lift,
"elbow_flex": config.elbow_flex,
"wrist_flex": config.wrist_flex,
"wrist_roll": config.wrist_roll,
"gripper": config.gripper,
},
)
self.cameras = make_cameras_from_configs(config.cameras)
self.is_connected = False
self.logs = {}
@property
def state_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
}
@property
def action_feature(self) -> dict:
return self.state_feature
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
cam_ft[cam_key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
)
logging.info("Connecting arm.")
self.arm.connect()
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
self.calibrate()
# Mode=0 for Position Control
self.arm.write("Mode", 0)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.arm.write("P_Coefficient", 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", 0)
self.arm.write("D_Coefficient", 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.arm.write("Lock", 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.arm.write("Maximum_Acceleration", 254)
self.arm.write("Acceleration", 254)
logging.info("Activating torque.")
self.arm.write("Torque_Enable", TorqueMode.ENABLED.value)
# Check arm can be read
self.arm.read("Present_Position")
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.is_connected = True
def calibrate(self) -> None:
"""After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "follower")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)
apply_feetech_offsets_from_calibration(self.arm, calibration)
def get_observation(self) -> dict[str, np.ndarray]:
"""The returned observations do not have a batch dimension."""
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
obs_dict = {}
# Read arm position
before_read_t = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.read("Present_Position")
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
# Capture images from cameras
for cam_key, cam in self.cameras.items():
before_camread_t = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t
return obs_dict
def send_action(self, action: np.ndarray) -> np.ndarray:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Args:
action (np.ndarray): array containing the goal positions for the motors.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
np.ndarray: the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
goal_pos = action
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.read("Present_Position")
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.arm.write("Goal_Position", goal_pos.astype(np.int32))
return goal_pos
def print_logs(self):
# TODO(aliberts): move robot-specific logs logic here
pass
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
)
self.arm.disconnect()
for cam in self.cameras.values():
cam.disconnect()
self.is_connected = False

View File

@@ -0,0 +1,2 @@
from .configuration_moss import MossRobotConfig
from .moss_follower import MossRobot

View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_moss import MossRobotConfig
logger = logging.getLogger(__name__)
class MossRobot(Robot):
"""
[Moss Arm](https://github.com/jess-moss/moss-robot-arms) designed by Jess Moss
"""
config_class = MossRobotConfig
name = "moss_follower"
def __init__(self, config: MossRobotConfig):
super().__init__(config)
self.config = config
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
def connect(self) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
if not self.is_calibrated:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor]
print(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.arm.torque_disabled():
self.arm.configure_motors()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.arm.write("P_Coefficient", motor, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", motor, 0)
self.arm.write("D_Coefficient", motor, 32)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Read arm position
start = time.perf_counter()
obs_dict = self.arm.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -36,15 +36,11 @@ class Robot(abc.ABC):
# TODO(aliberts): create a proper Feature class for this that links with datasets
@abc.abstractproperty
def state_feature(self) -> dict:
def observation_features(self) -> dict:
pass
@abc.abstractproperty
def action_feature(self) -> dict:
pass
@abc.abstractproperty
def camera_features(self) -> dict[str, dict]:
def action_features(self) -> dict:
pass
@abc.abstractproperty

View File

@@ -16,10 +16,10 @@
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
@@ -60,27 +60,22 @@ class SO100Follower(Robot):
self.cameras = make_cameras_from_configs(config.cameras)
@property
def state_feature(self) -> dict:
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@property
def action_feature(self) -> dict:
return self.state_feature
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
cam_ft[cam_key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
@@ -164,18 +159,17 @@ class SO100Follower(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
obs_dict = self.arm.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -197,7 +191,7 @@ class SO100Follower(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = action
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
@@ -208,7 +202,7 @@ class SO100Follower(Robot):
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
return goal_pos
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:

View File

@@ -0,0 +1,2 @@
from .configuration_stretch3 import Stretch3RobotConfig
from .robot_stretch3 import Stretch3Robot

View File

@@ -72,7 +72,7 @@ class Stretch3Robot(Robot):
self.action_keys = None
@property
def state_feature(self) -> dict:
def observation_features(self) -> dict:
return {
"dtype": "float32",
"shape": (len(STRETCH_MOTORS),),
@@ -80,8 +80,8 @@ class Stretch3Robot(Robot):
}
@property
def action_feature(self) -> dict:
return self.state_feature
def action_features(self) -> dict:
return self.observation_features
@property
def camera_features(self) -> dict[str, dict]:

View File

@@ -1,47 +1,27 @@
import logging
from pprint import pformat
from typing import Protocol
from lerobot.common.robots import RobotConfig
def get_arm_id(name, arm_type):
"""Returns the string identifier of a robot arm. For instance, for a bimanual manipulator
like Aloha, it could be left_follower, right_follower, left_leader, or right_leader.
"""
return f"{name}_{arm_type}"
# TODO(aliberts): Remove and point to lerobot.common.robots.Robot
class Robot(Protocol):
robot_type: str
features: dict
def connect(self): ...
def run_calibration(self): ...
def teleop_step(self, record_data=False): ...
def capture_observation(self): ...
def send_action(self, action): ...
def disconnect(self): ...
from .robot import Robot
def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
if robot_type == "aloha":
from .aloha.configuration_aloha import AlohaRobotConfig
raise NotImplementedError # TODO
return AlohaRobotConfig(**kwargs)
elif robot_type == "koch_follower":
from .koch.config_koch_follower import KochFollowerConfig
from .koch_follower.config_koch_follower import KochFollowerConfig
return KochFollowerConfig(**kwargs)
# elif robot_type == "koch_bimanual":
# return KochBimanualRobotConfig(**kwargs)
elif robot_type == "moss":
from .moss.configuration_moss import MossRobotConfig
from .moss_follower.configuration_moss import MossRobotConfig
return MossRobotConfig(**kwargs)
elif robot_type == "so100_leader":
from .so100.config_so100_follower import SO100FollowerConfig
from .so100_follower.config_so100_follower import SO100FollowerConfig
return SO100FollowerConfig(**kwargs)
elif robot_type == "stretch":
@@ -56,23 +36,29 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
raise ValueError(f"Robot type '{robot_type}' is not available.")
def make_robot_from_config(config: RobotConfig):
from .lekiwi.config_lekiwi import LeKiwiConfig
from .manipulator import ManipulatorRobotConfig
def make_robot_from_config(config: RobotConfig) -> Robot:
if config.type == "koch_follower":
from .koch_follower import KochFollower
if isinstance(config, ManipulatorRobotConfig):
from lerobot.common.robots.manipulator import ManipulatorRobot
return KochFollower(config)
elif config.type == "so100_follower":
from .so100_follower import SO100Follower
return ManipulatorRobot(config)
elif isinstance(config, LeKiwiConfig):
from lerobot.common.robots.lekiwi import LeKiwiClient
return SO100Follower(config)
elif config.type == "lekiwi":
from .lekiwi import LeKiwiClient
return LeKiwiClient(config)
...
else:
from lerobot.common.robots.stretch3.robot_stretch3 import Stretch3Robot
elif config.type == "stretch3":
from .stretch3 import Stretch3Robot
return Stretch3Robot(config)
elif config.type == "viperx":
from .viperx import ViperX
return ViperX(config)
else:
raise ValueError(config.type)
def make_robot(robot_type: str, **kwargs) -> Robot:
@@ -116,3 +102,11 @@ def ensure_safe_goal_position(
)
return safe_goal_positions
# TODO(aliberts): Remove
def get_arm_id(name, arm_type):
"""Returns the string identifier of a robot arm. For instance, for a bimanual manipulator
like Aloha, it could be left_follower, right_follower, left_leader, or right_leader.
"""
return f"{name}_{arm_type}"

View File

@@ -6,10 +6,11 @@ and send orders to its motors.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.constants import OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.dynamixel import (
@@ -55,28 +56,22 @@ class ViperX(Robot):
self.cameras = make_cameras_from_configs(config.cameras)
@property
def state_feature(self) -> dict:
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@property
def action_feature(self) -> dict:
return self.state_feature
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
key = f"observation.images.{cam_key}"
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
@@ -177,13 +172,14 @@ class ViperX(Robot):
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -205,7 +201,7 @@ class ViperX(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = action
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
@@ -216,7 +212,7 @@ class ViperX(Robot):
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
return goal_pos
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected: