Compare commits

..

23 Commits

Author SHA1 Message Date
Simon Alibert
897bfe588a post-merge fix 2025-03-10 19:28:24 +01:00
Simon Alibert
74c9ee5959 Fix imports 2025-03-10 19:06:24 +01:00
Simon Alibert
e07595b346 Merge remote-tracking branch 'origin/user/pepijn/2025_01_27_steps_assembly_intructions' into user/aliberts/2025_03_10_new_feetech_calibration 2025-03-10 18:59:30 +01:00
Pepijn
b5bc3d1a25 Move to middle 2025-02-19 16:34:36 +01:00
Pepijn
25a5b9ad0b Zero homing, same values as previous calib 2025-02-19 15:22:07 +01:00
Pepijn
645bfd78d8 With multi turn (code is good, but feetech can't handle multi turn commands) 2025-02-19 14:51:01 +01:00
Pepijn
18ae1b802d set drive mode 2025-02-19 11:34:56 +01:00
Pepijn
042c095ddd Add back drive mode 2025-02-19 10:38:25 +01:00
Pepijn
8088110aac add todo 2025-02-18 19:49:59 +01:00
Pepijn
e8f90ef3f5 Fix with virtual zero in same position as old calibration virtual zero 2025-02-18 18:28:00 +01:00
Pepijn
63e6f7901e Added calibration visualization 2025-02-18 15:04:25 +01:00
Pepijn
27c6b84b1f merge main 2025-02-10 13:22:26 +01:00
Pepijn
514ce5d0b1 improve test method 2025-01-31 09:26:42 +01:00
Pepijn
b7a343e797 add multi turn support 2025-01-30 19:59:26 +01:00
Pepijn
1d121e3321 update configure 2025-01-30 19:22:21 +01:00
Pepijn
3ef54ab019 remove cache files 2025-01-30 13:58:44 +01:00
Pepijn
f540eb81ff fix write issue 2025-01-30 13:32:07 +01:00
Pepijn
0ffe2520f4 change manual motor calib 2025-01-30 13:01:00 +01:00
Pepijn
fa8ed524fa add first setup new calibration 2025-01-29 16:45:47 +01:00
Pepijn
5eefafa958 test formatting github 2025-01-28 16:44:56 +01:00
Pepijn
0193294898 add table of contents 2025-01-28 09:41:36 +01:00
Pepijn
3d6f245aa7 add test 2025-01-27 16:25:46 +01:00
Pepijn
409601474f test emoji 2025-01-27 16:24:41 +01:00
115 changed files with 475 additions and 749 deletions

View File

@@ -73,7 +73,7 @@ pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
!tests/artifacts
!tests/data
htmlcov/
.tox/
.nox/

2
.gitignore vendored
View File

@@ -78,7 +78,7 @@ pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
!tests/artifacts
!tests/data
htmlcov/
.tox/
.nox/

View File

@@ -12,17 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exclude: "tests/artifacts/.*\\.safetensors$"
exclude: ^(tests/data)
default_language_version:
python: python3.10
repos:
##### Meta #####
- repo: meta
hooks:
- id: check-useless-excludes
- id: check-hooks-apply
##### Style / Misc. #####
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
@@ -35,37 +28,31 @@ repos:
- id: check-toml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.30.2
rev: v1.30.0
hooks:
- id: typos
args: [--force-exclude]
- repo: https://github.com/asottile/pyupgrade
rev: v3.19.1
hooks:
- id: pyupgrade
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.10
rev: v0.9.9
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
##### Security #####
- repo: https://github.com/gitleaks/gitleaks
rev: v8.24.0
hooks:
- id: gitleaks
- repo: https://github.com/woodruffw/zizmor-pre-commit
rev: v1.4.1
hooks:
- id: zizmor
- repo: https://github.com/PyCQA/bandit
rev: 1.8.3
hooks:

View File

@@ -291,7 +291,7 @@ sudo apt-get install git-lfs
git lfs install
```
Pull artifacts if they're not in [tests/artifacts](tests/artifacts)
Pull artifacts if they're not in [tests/data](tests/data)
```bash
git lfs pull
```

View File

@@ -232,8 +232,8 @@ python lerobot/scripts/eval.py \
--env.type=pusht \
--eval.batch_size=10 \
--eval.n_episodes=10 \
--policy.use_amp=false \
--policy.device=cuda
--use_amp=false \
--device=cuda
```
Note: After training your own policy, you can re-evaluate the checkpoints with:

View File

@@ -1,5 +1,5 @@
This tutorial will explain the training script, how to use it, and particularly how to configure everything needed for the training run.
> **Note:** The following assume you're running these commands on a machine equipped with a cuda GPU. If you don't have one (or if you're using a Mac), you can add `--policy.device=cpu` (`--policy.device=mps` respectively). However, be advised that the code executes much slower on cpu.
> **Note:** The following assume you're running these commands on a machine equipped with a cuda GPU. If you don't have one (or if you're using a Mac), you can add `--device=cpu` (`--device=mps` respectively). However, be advised that the code executes much slower on cpu.
## The training script

View File

@@ -386,14 +386,14 @@ When you connect your robot for the first time, the [`ManipulatorRobot`](../lero
Here are the positions you'll move the follower arm to:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/koch/follower_zero.webp?raw=true" alt="Koch v1.1 follower arm zero position" title="Koch v1.1 follower arm zero position" style="width:100%;"> | <img src="../media/koch/follower_rotated.webp?raw=true" alt="Koch v1.1 follower arm rotated position" title="Koch v1.1 follower arm rotated position" style="width:100%;"> | <img src="../media/koch/follower_rest.webp?raw=true" alt="Koch v1.1 follower arm rest position" title="Koch v1.1 follower arm rest position" style="width:100%;"> |
And here are the corresponding positions for the leader arm:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/koch/leader_zero.webp?raw=true" alt="Koch v1.1 leader arm zero position" title="Koch v1.1 leader arm zero position" style="width:100%;"> | <img src="../media/koch/leader_rotated.webp?raw=true" alt="Koch v1.1 leader arm rotated position" title="Koch v1.1 leader arm rotated position" style="width:100%;"> | <img src="../media/koch/leader_rest.webp?raw=true" alt="Koch v1.1 leader arm rest position" title="Koch v1.1 leader arm rest position" style="width:100%;"> |
You can watch a [video tutorial of the calibration procedure](https://youtu.be/8drnU9uRY24) for more details.
@@ -898,14 +898,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_koch_test \
--job_name=act_koch_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/koch_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
For more information on the `train` script see the previous tutorial: [`examples/4_train_policy_with_script.md`](../examples/4_train_policy_with_script.md)

View File

@@ -49,7 +49,7 @@ def find_cameras(raise_when_empty=True, mock=False) -> list[dict]:
connected to the computer.
"""
if mock:
import tests.cameras.mock_pyrealsense2 as rs
import tests.mock_pyrealsense2 as rs
else:
import pyrealsense2 as rs
@@ -101,7 +101,7 @@ def save_images_from_cameras(
serial_numbers = [cam["serial_number"] for cam in camera_infos]
if mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -252,7 +252,7 @@ class RealSenseCamera(Camera):
self.logs = {}
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -284,7 +284,7 @@ class RealSenseCamera(Camera):
raise DeviceAlreadyConnectedError(f"RealSenseCamera({self.serial_number}) is already connected.")
if self.mock:
import tests.cameras.mock_pyrealsense2 as rs
import tests.mock_pyrealsense2 as rs
else:
import pyrealsense2 as rs
@@ -372,7 +372,7 @@ class RealSenseCamera(Camera):
)
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2

View File

@@ -81,7 +81,7 @@ def _find_cameras(
possible_camera_ids: list[int | str], raise_when_empty=False, mock=False
) -> list[int | str]:
if mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -270,7 +270,7 @@ class OpenCVCamera(Camera):
self.logs = {}
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -287,7 +287,7 @@ class OpenCVCamera(Camera):
raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -399,7 +399,7 @@ class OpenCVCamera(Camera):
# so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb":
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2

View File

@@ -332,7 +332,7 @@ class DynamixelMotorsBus:
)
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -356,7 +356,7 @@ class DynamixelMotorsBus:
def reconnect(self):
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -646,7 +646,7 @@ class DynamixelMotorsBus:
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -691,7 +691,7 @@ class DynamixelMotorsBus:
start_time = time.perf_counter()
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -757,7 +757,7 @@ class DynamixelMotorsBus:
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -793,7 +793,7 @@ class DynamixelMotorsBus:
start_time = time.perf_counter()
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl

View File

@@ -1,9 +1,4 @@
from .feetech import FeetechMotorsBus, TorqueMode
from .feetech_calibration import apply_feetech_offsets_from_calibration, run_full_arm_calibration
from .feetech_calibration import run_full_arm_calibration
__all__ = [
"FeetechMotorsBus",
"TorqueMode",
"apply_feetech_offsets_from_calibration",
"run_full_arm_calibration",
]
__all__ = ["FeetechMotorsBus", "TorqueMode", "run_full_arm_calibration"]

View File

@@ -136,21 +136,24 @@ def convert_degrees_to_ticks(degrees, model):
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int:
def adjusted_to_homing_ticks(
raw_motor_ticks: int, encoder_offset: int, model: str, motorbus, motor_id: int
) -> int:
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
Shifts raw [0..4095] ticks by an encoder offset, modulo a single turn [0..4095].
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# Add offset and wrap within resolution
ticks = (raw_motor_ticks + encoder_offset) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
# # Re-center into a symmetric range (e.g., [-2048, 2047] if resolutions==4096) Thus the middle homing position will be virtual 0.
if ticks > resolutions // 2:
ticks -= resolutions
# Flip sign if drive_mode is set.
# Update direction of rotation of the motor to match between leader and follower.
# In fact, the motor of the leader for a given joint can be assembled in an
# opposite direction in term of rotation than the motor of the follower on the same joint.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
@@ -161,12 +164,15 @@ def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_i
return ticks
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int:
def adjusted_to_motor_ticks(
adjusted_pos: int, encoder_offset: int, model: str, motorbus, motor_id: int
) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
Inverse of adjusted_to_homing_ticks().
"""
# Flip sign if drive_mode was set.
# Update direction of rotation of the motor to match between leader and follower.
# In fact, the motor of the leader for a given joint can be assembled in an
# opposite direction in term of rotation than the motor of the follower on the same joint.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
@@ -176,9 +182,8 @@ def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: i
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
# Remove offset and wrap within resolution
ticks = (adjusted_pos - encoder_offset) % resolutions
return ticks
@@ -347,7 +352,7 @@ class FeetechMotorsBus:
)
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -371,7 +376,7 @@ class FeetechMotorsBus:
def reconnect(self):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -448,10 +453,11 @@ class FeetechMotorsBus:
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
homing_offset = self.calibration["homing_offset"][calib_idx]
motor_idx, model = self.motors[name]
# Convert raw motor ticks to homed ticks, then convert the homed ticks to degrees
values[i] = adjusted_to_homing_ticks(values[i], model, self, motor_idx)
values[i] = adjusted_to_homing_ticks(values[i], homing_offset, model, self, motor_idx)
values[i] = convert_ticks_to_degrees(values[i], model)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
@@ -484,11 +490,12 @@ class FeetechMotorsBus:
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
homing_offset = self.calibration["homing_offset"][calib_idx]
motor_idx, model = self.motors[name]
# Convert degrees to homed ticks, then convert the homed ticks to raw ticks
values[i] = convert_degrees_to_ticks(values[i], model)
values[i] = adjusted_to_motor_ticks(values[i], model, self, motor_idx)
values[i] = adjusted_to_motor_ticks(values[i], homing_offset, model, self, motor_idx)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
@@ -503,7 +510,7 @@ class FeetechMotorsBus:
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -541,7 +548,7 @@ class FeetechMotorsBus:
def read(self, data_name, motor_names: str | list[str] | None = None):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -614,7 +621,7 @@ class FeetechMotorsBus:
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -650,7 +657,7 @@ class FeetechMotorsBus:
start_time = time.perf_counter()
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs

View File

@@ -11,12 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Logic to calibrate a robot arm built with feetech motors"""
# TODO(rcadene, aliberts): move this logic into the robot code when refactoring
import numpy as np
from ..motors_bus import MotorsBus
from .feetech import (
CalibrationMode,
FeetechMotorsBus,
TorqueMode,
)
@@ -38,37 +41,31 @@ def get_calibration_modes(arm: MotorsBus):
]
def reset_offset(motor_id, motor_bus):
# Open the write lock, changes to EEPROM do NOT persist yet
motor_bus.write("Lock", 1)
# Set offset to 0
motor_name = motor_bus.motor_names[motor_id - 1]
motor_bus.write("Offset", 0, motor_names=[motor_name])
# Close the write lock, changes to EEPROM do persist
motor_bus.write("Lock", 0)
# Confirm that the offset is zero by reading it back
confirmed_offset = motor_bus.read("Offset")[motor_id - 1]
print(f"Offset for motor {motor_id} reset to: {confirmed_offset}")
return confirmed_offset
def calibrate_homing_motor(motor_id, motor_bus):
reset_offset(motor_id, motor_bus)
"""
1) Reads servo ticks.
2) Calculates the offset so that 'home_ticks' becomes 0.
3) Returns the offset
"""
home_ticks = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts at 0
print(f"Encoder offset (present position in homing position): {home_ticks}")
return home_ticks
# Calculate how many ticks to shift so that 'home_ticks' becomes 0
raw_offset = -home_ticks # negative of home_ticks
encoder_offset = raw_offset % 4096 # wrap to [0..4095]
# Convert to a signed range [-2048..2047]
if encoder_offset > 2047:
encoder_offset -= 4096
print(f"Encoder offset: {encoder_offset}")
return encoder_offset
def calibrate_linear_motor(motor_id, motor_bus):
motor_names = motor_bus.motor_names
motor_name = motor_names[motor_id - 1]
reset_offset(motor_id, motor_bus)
motor_name = motor_names[motor_id - 1] # TODO(pepijn): replace motor_id with motor index when (id-1)
input(f"Close the {motor_name}, then press Enter...")
start_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
@@ -163,7 +160,7 @@ def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm
print(f"\n calibration of {robot_type} {arm_name} {arm_type} done!")
# Force drive_mode values (can be static)
# Force drive_mode values: motors 2 and 5 -> drive_mode 1; all others -> 0. 1 = clockwise = positive range (0..180), 0 = clockwise = negative range (0..-180)
drive_modes = [0, 1, 0, 0, 1, 0]
calib_dict = {
@@ -185,70 +182,3 @@ def run_full_auto_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str
```
"""
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")

View File

@@ -1,31 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras.configs import CameraConfig
from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.common.robots.config import RobotConfig
@RobotConfig.register_subclass("dummy")
@dataclass
class DummyConfig(RobotConfig):
id = "dummy"
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"cam": OpenCVCameraConfig(camera_index=0, fps=30, width=1280, height=720),
}
)

View File

@@ -1,101 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import numpy as np
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..robot import Robot
from .configuration_dummy import DummyConfig
class Dummy(Robot):
config_class = DummyConfig
name = "dummy"
def __init__(self, config: DummyConfig):
super().__init__(config)
self.cameras = make_cameras_from_configs(config.cameras)
self.is_connected = False
@property
def state_feature(self) -> dict:
logging.warning("Dummy has nothing to send.")
@property
def action_feature(self) -> dict:
logging.warning("Dummy has nothing to send.")
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {
"cam": {
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
"info": None,
},
}
return cam_ft
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(
"Dummy is already connected. Do not run `robot.connect()` twice."
)
logging.info("Connecting cameras.")
for cam in self.cameras.values():
cam.connect()
self.is_connected = True
def calibrate(self) -> None:
logging.warning("Dummy has nothing to calibrate.")
return
def get_observation(self) -> dict[str, np.ndarray]:
if not self.is_connected:
raise DeviceNotConnectedError("Dummy is not connected. You need to run `robot.connect()`.")
obs_dict = {}
for cam_key, cam in self.cameras.items():
frame = cam.async_read()
obs_dict[f"{OBS_STATE}.{cam_key}"] = frame
return obs_dict
def send_action(self, action: np.ndarray) -> np.ndarray:
logging.warning("Dummy has nothing to send.")
def print_logs(self):
pass
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(
"Dummy is not connected. You need to run `robot.connect()` before disconnecting."
)
for cam in self.cameras.values():
cam.disconnect()
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()

View File

@@ -1,71 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import rerun as rr
from lerobot.common.robots.dummy.configuration_dummy import DummyConfig
from lerobot.common.robots.dummy.dummy import Dummy
from lerobot.common.teleoperators.so100 import SO100Teleop, SO100TeleopConfig
# IMO, it's better to use rerun in the application code instead of the library code
def main():
logging.info("Configuring Devices")
leader_arm_config = SO100TeleopConfig(port="/dev/tty.usbmodem58760434171")
leader_arm = SO100Teleop(leader_arm_config)
robot_config = DummyConfig()
robot = Dummy(robot_config)
logging.info("Connecting SO100 Devices")
leader_arm.connect()
logging.info("Connecting Dummy")
robot.connect()
rr.init("rerun_dummy_data")
# If data source and visualizer are in different host, use .connect() instead to establish a tcp connection
# We can define a custom blueprint configuration for the visualizer panels
# Memory limit will make sure no more than 5% of the memory is used by the visualizer
rr.spawn(memory_limit="5%")
logging.info("Starting...")
i = 0
while i < 10000:
# An alternative would be do rerun log inside of these methods. But then that means embedding rerun into the library
arm_action = leader_arm.get_action()
observation = robot.get_observation()
for j in range(arm_action.size):
# If you want to disable batching we can do it: export RERUN_FLUSH_NUM_BYTES=256
current_time = time.time()
rr.set_time_seconds(f"arm_action_{j}", current_time)
rr.log(f"arm_action_{j}", rr.Scalar(arm_action[j]))
for k, v in observation.items():
# This discards all previous image frames
rr.set_time_seconds(k, 0.0)
rr.log(k, rr.Image(v))
i += 1
robot.disconnect()
leader_arm.disconnect()
if __name__ == "__main__":
main()

View File

@@ -16,5 +16,15 @@ class KochRobotConfig(RobotConfig):
# the number of motors in your follower arms.
max_relative_target: int | None = None
mock: bool = False
# motors
shoulder_pan: tuple = (1, "xl430-w250")
shoulder_lift: tuple = (2, "xl430-w250")
elbow_flex: tuple = (3, "xl330-m288")
wrist_flex: tuple = (4, "xl330-m288")
wrist_roll: tuple = (5, "xl330-m288")
gripper: tuple = (6, "xl330-m288")
# cameras
cameras: dict[str, CameraConfig] = field(default_factory=dict)

View File

@@ -49,16 +49,17 @@ class KochRobot(Robot):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.arm = DynamixelMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
"shoulder_pan": config.shoulder_pan,
"shoulder_lift": config.shoulder_lift,
"elbow_flex": config.elbow_flex,
"wrist_flex": config.wrist_flex,
"wrist_roll": config.wrist_roll,
"gripper": config.gripper,
},
)
self.cameras = make_cameras_from_configs(config.cameras)
@@ -128,17 +129,19 @@ class KochRobot(Robot):
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
arm_calib_path = self.calibration_dir / f"{self.config.id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
logging.info(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "follower")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)

View File

@@ -366,8 +366,8 @@ Now we have to calibrate the leader arm and the follower arm. The wheel motors d
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/lekiwi/mobile_calib_zero.webp?raw=true" alt="SO-100 follower arm zero position" title="SO-100 follower arm zero position" style="width:100%;"> | <img src="../media/lekiwi/mobile_calib_rotated.webp?raw=true" alt="SO-100 follower arm rotated position" title="SO-100 follower arm rotated position" style="width:100%;"> | <img src="../media/lekiwi/mobile_calib_rest.webp?raw=true" alt="SO-100 follower arm rest position" title="SO-100 follower arm rest position" style="width:100%;"> |
Make sure the arm is connected to the Raspberry Pi and run this script (on the Raspberry Pi) to launch manual calibration:
@@ -385,8 +385,8 @@ If you have the **wired** LeKiwi version please run all commands including this
### Calibrate leader arm
Then to calibrate the leader arm (which is attached to the laptop/pc). You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/so100/leader_zero.webp?raw=true" alt="SO-100 leader arm zero position" title="SO-100 leader arm zero position" style="width:100%;"> | <img src="../media/so100/leader_rotated.webp?raw=true" alt="SO-100 leader arm rotated position" title="SO-100 leader arm rotated position" style="width:100%;"> | <img src="../media/so100/leader_rest.webp?raw=true" alt="SO-100 leader arm rest position" title="SO-100 leader arm rest position" style="width:100%;"> |
Run this script (on your laptop/pc) to launch manual calibration:
@@ -416,22 +416,22 @@ python lerobot/scripts/control_robot.py \
You should see on your laptop something like this: ```[INFO] Connected to remote robot at tcp://172.17.133.91:5555 and video stream at tcp://172.17.133.91:5556.``` Now you can move the leader arm and use the keyboard (w,a,s,d) to drive forward, left, backwards, right. And use (z,x) to turn left or turn right. You can use (r,f) to increase and decrease the speed of the mobile robot. There are three speed modes, see the table below:
| Speed Mode | Linear Speed (m/s) | Rotation Speed (deg/s) |
| ---------- | ------------------ | ---------------------- |
| Fast | 0.4 | 90 |
| Medium | 0.25 | 60 |
| Slow | 0.1 | 30 |
|------------|-------------------|-----------------------|
| Fast | 0.4 | 90 |
| Medium | 0.25 | 60 |
| Slow | 0.1 | 30 |
| Key | Action |
| --- | -------------- |
| W | Move forward |
| A | Move left |
| S | Move backward |
| D | Move right |
| Z | Turn left |
| X | Turn right |
| R | Increase speed |
| F | Decrease speed |
| Key | Action |
|------|--------------------------------|
| W | Move forward |
| A | Move left |
| S | Move backward |
| D | Move right |
| Z | Turn left |
| X | Turn right |
| R | Increase speed |
| F | Decrease speed |
> [!TIP]
> If you use a different keyboard you can change the keys for each command in the [`LeKiwiRobotConfig`](../lerobot/common/robot_devices/robots/configs.py).
@@ -549,14 +549,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_lekiwi_test \
--job_name=act_lekiwi_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/lekiwi_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_lekiwi_test/checkpoints`.

View File

@@ -12,7 +12,7 @@ import zmq
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceNotConnectedError
from lerobot.common.motors.feetech.feetech import TorqueMode
from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration
from lerobot.common.motors.feetech.feetech_calibration import run_arm_manual_calibration
from lerobot.common.motors.motors_bus import MotorsBus
from lerobot.common.motors.utils import make_motors_buses_from_configs
from lerobot.common.robots.lekiwi.configuration_lekiwi import LeKiwiRobotConfig
@@ -253,7 +253,7 @@ class MobileManipulator:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type)
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:

View File

@@ -81,73 +81,6 @@ class ManipulatorRobotConfig(RobotConfig):
)
def apply_feetech_offsets_from_calibration(motorsbus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
class ManipulatorRobot:
# TODO(rcadene): Implement force feedback
"""This class allows to control any manipulator robot of various number of motors.
@@ -421,24 +354,12 @@ class ManipulatorRobot:
return calibration
# For each follower arm
for name, arm_bus in self.follower_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "follower")
arm_bus.set_calibration(calibration)
# If this is a Feetech robot, also set the servo offset into EEPROM
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
# For each leader arm
for name, arm_bus in self.leader_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "leader")
arm_bus.set_calibration(calibration)
# Optionally also set offset for leader if you want the servo offsets as well
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
for name, arm in self.follower_arms.items():
calibration = load_or_run_calibration_(name, arm, "follower")
arm.set_calibration(calibration)
for name, arm in self.leader_arms.items():
calibration = load_or_run_calibration_(name, arm, "leader")
arm.set_calibration(calibration)
def set_koch_robot_preset(self):
def set_operating_mode_(arm):

View File

@@ -176,8 +176,8 @@ Next, you'll need to calibrate your Moss v1 robot to ensure that the leader and
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/moss/follower_zero.webp?raw=true" alt="Moss v1 follower arm zero position" title="Moss v1 follower arm zero position" style="width:100%;"> | <img src="../media/moss/follower_rotated.webp?raw=true" alt="Moss v1 follower arm rotated position" title="Moss v1 follower arm rotated position" style="width:100%;"> | <img src="../media/moss/follower_rest.webp?raw=true" alt="Moss v1 follower arm rest position" title="Moss v1 follower arm rest position" style="width:100%;"> |
Make sure both arms are connected and run this script to launch manual calibration:
@@ -192,8 +192,8 @@ python lerobot/scripts/control_robot.py \
**Manual calibration of leader arm**
Follow step 6 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/moss/leader_zero.webp?raw=true" alt="Moss v1 leader arm zero position" title="Moss v1 leader arm zero position" style="width:100%;"> | <img src="../media/moss/leader_rotated.webp?raw=true" alt="Moss v1 leader arm rotated position" title="Moss v1 leader arm rotated position" style="width:100%;"> | <img src="../media/moss/leader_rest.webp?raw=true" alt="Moss v1 leader arm rest position" title="Moss v1 leader arm rest position" style="width:100%;"> |
Run this script to launch manual calibration:
@@ -293,14 +293,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_moss_test \
--job_name=act_moss_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/moss_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_moss_test/checkpoints`.

View File

@@ -26,7 +26,6 @@ from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnecte
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
TorqueMode,
apply_feetech_offsets_from_calibration,
run_full_arm_calibration,
)
@@ -47,6 +46,7 @@ class MossRobot(Robot):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.arm = FeetechMotorsBus(
port=self.config.port,
@@ -133,21 +133,22 @@ class MossRobot(Robot):
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
arm_calib_path = self.calibration_dir / f"{self.config.id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
logging.info(f"Missing calibration file '{arm_calib_path}'")
calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "follower")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)
apply_feetech_offsets_from_calibration(self.arm, calibration)
def get_observation(self) -> dict[str, np.ndarray]:
"""The returned observations do not have a batch dimension."""

View File

@@ -1,13 +1,12 @@
import abc
from typing import Any
import numpy as np
from lerobot.common.constants import HF_LEROBOT_CALIBRATION, ROBOTS
from .config import RobotConfig
# TODO(aliberts): action/obs typing such as Generic[ObsType, ActType] similar to gym.Env ?
# https://github.com/Farama-Foundation/Gymnasium/blob/3287c869f9a48d99454306b0d4b4ec537f0f35e3/gymnasium/core.py#L23
class Robot(abc.ABC):
"""The main LeRobot class for implementing robots."""
@@ -17,12 +16,10 @@ class Robot(abc.ABC):
def __init__(self, config: RobotConfig):
self.robot_type = self.name
self.id = config.id
self.calibration_dir = (
config.calibration_dir if config.calibration_dir else HF_LEROBOT_CALIBRATION / ROBOTS / self.name
)
self.calibration_dir.mkdir(parents=True, exist_ok=True)
self.calibration_fpath = self.calibration_dir / f"{self.id}.json"
# TODO(aliberts): create a proper Feature class for this that links with datasets
@abc.abstractproperty
@@ -48,12 +45,12 @@ class Robot(abc.ABC):
pass
@abc.abstractmethod
def get_observation(self) -> dict[str, Any]:
def get_observation(self) -> dict[str, np.ndarray]:
"""Gets observation from the robot."""
pass
@abc.abstractmethod
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
def send_action(self, action: np.ndarray) -> np.ndarray:
"""Sends actions to the robot."""
pass

View File

@@ -454,8 +454,8 @@ Next, you'll need to calibrate your SO-100 robot to ensure that the leader and f
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/so100/follower_zero.webp?raw=true" alt="SO-100 follower arm zero position" title="SO-100 follower arm zero position" style="width:100%;"> | <img src="../media/so100/follower_rotated.webp?raw=true" alt="SO-100 follower arm rotated position" title="SO-100 follower arm rotated position" style="width:100%;"> | <img src="../media/so100/follower_rest.webp?raw=true" alt="SO-100 follower arm rest position" title="SO-100 follower arm rest position" style="width:100%;"> |
Make sure both arms are connected and run this script to launch manual calibration:
@@ -470,8 +470,8 @@ python lerobot/scripts/control_robot.py \
#### b. Manual calibration of leader arm
Follow step 6 of the [assembly video](https://youtu.be/FioA2oeFZ5I?t=724) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/so100/leader_zero.webp?raw=true" alt="SO-100 leader arm zero position" title="SO-100 leader arm zero position" style="width:100%;"> | <img src="../media/so100/leader_rotated.webp?raw=true" alt="SO-100 leader arm rotated position" title="SO-100 leader arm rotated position" style="width:100%;"> | <img src="../media/so100/leader_rest.webp?raw=true" alt="SO-100 leader arm rest position" title="SO-100 leader arm rest position" style="width:100%;"> |
Run this script to launch manual calibration:
@@ -571,14 +571,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_so100_test \
--job_name=act_so100_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/so100_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_so100_test/checkpoints`.

View File

@@ -1,4 +1,4 @@
from .configuration_so100 import SO100RobotConfig
from .robot_so100 import SO100Robot
from .configuration_so100 import So100RobotConfig
from .robot_so100 import So100Robot
__all__ = ["SO100RobotConfig", "SO100Robot"]
__all__ = ["So100RobotConfig", "So100Robot"]

View File

@@ -7,7 +7,7 @@ from ..config import RobotConfig
@RobotConfig.register_subclass("so100")
@dataclass
class SO100RobotConfig(RobotConfig):
class So100RobotConfig(RobotConfig):
# Port to connect to the robot
port: str
@@ -16,5 +16,15 @@ class SO100RobotConfig(RobotConfig):
# the number of motors in your follower arms.
max_relative_target: int | None = None
mock: bool = False
# motors
shoulder_pan: tuple = (1, "sts3215")
shoulder_lift: tuple = (2, "sts3215")
elbow_flex: tuple = (3, "sts3215")
wrist_flex: tuple = (4, "sts3215")
wrist_roll: tuple = (5, "sts3215")
gripper: tuple = (6, "sts3215")
# cameras
cameras: dict[str, CameraConfig] = field(default_factory=dict)

View File

@@ -26,37 +26,37 @@ from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnecte
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
TorqueMode,
apply_feetech_offsets_from_calibration,
run_full_arm_calibration,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_so100 import SO100RobotConfig
from .configuration_so100 import So100RobotConfig
class SO100Robot(Robot):
class So100Robot(Robot):
"""
[SO-100 Follower Arm](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio
[SO-100 Arm](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio
"""
config_class = SO100RobotConfig
name = "so100"
config_class = So100RobotConfig
name = "koch"
def __init__(self, config: SO100RobotConfig):
def __init__(self, config: So100RobotConfig):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": (1, "sts3215"),
"shoulder_lift": (2, "sts3215"),
"elbow_flex": (3, "sts3215"),
"wrist_flex": (4, "sts3215"),
"wrist_roll": (5, "sts3215"),
"gripper": (6, "sts3215"),
"shoulder_pan": config.shoulder_pan,
"shoulder_lift": config.shoulder_lift,
"elbow_flex": config.elbow_flex,
"wrist_flex": config.wrist_flex,
"wrist_roll": config.wrist_roll,
"gripper": config.gripper,
},
)
self.cameras = make_cameras_from_configs(config.cameras)
@@ -133,21 +133,22 @@ class SO100Robot(Robot):
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
arm_calib_path = self.calibration_dir / f"{self.config.id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
logging.info(f"Missing calibration file '{arm_calib_path}'")
calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "follower")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)
apply_feetech_offsets_from_calibration(self.arm, calibration)
def get_observation(self) -> dict[str, np.ndarray]:
"""The returned observations do not have a batch dimension."""

View File

@@ -42,9 +42,9 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
return MossRobotConfig(**kwargs)
elif robot_type == "so100":
from .so100.configuration_so100 import SO100RobotConfig
from .so100.configuration_so100 import So100RobotConfig
return SO100RobotConfig(**kwargs)
return So100RobotConfig(**kwargs)
elif robot_type == "stretch":
from .stretch3.configuration_stretch3 import Stretch3RobotConfig

View File

@@ -135,14 +135,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_aloha_test \
--job_name=act_aloha_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/aloha_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
For more information on the `train` script see the previous tutorial: [`examples/4_train_policy_with_script.md`](../examples/4_train_policy_with_script.md)

View File

@@ -39,6 +39,7 @@ class ViperXRobot(Robot):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.arm = DynamixelMotorsBus(
port=self.config.port,
@@ -149,17 +150,19 @@ class ViperXRobot(Robot):
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
arm_calib_path = self.calibration_dir / f"{self.config.id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
logging.info(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "follower")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)

View File

@@ -55,6 +55,7 @@ class KeyboardTeleop(Teleoperator):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.pressed_keys = {}
self.listener = None

View File

@@ -46,6 +46,7 @@ class KochTeleop(Teleoperator):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.arm = DynamixelMotorsBus(
port=self.config.port,
@@ -105,17 +106,19 @@ class KochTeleop(Teleoperator):
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
arm_calib_path = self.calibration_dir / f"{self.id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
logging.info(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "leader")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)

View File

@@ -24,3 +24,13 @@ from ..config import TeleoperatorConfig
class SO100TeleopConfig(TeleoperatorConfig):
# Port to connect to the teloperator
port: str
mock: bool = False
# motors
shoulder_pan: tuple = (1, "sts3215")
shoulder_lift: tuple = (2, "sts3215")
elbow_flex: tuple = (3, "sts3215")
wrist_flex: tuple = (4, "sts3215")
wrist_roll: tuple = (5, "sts3215")
gripper: tuple = (6, "sts3215")

View File

@@ -24,7 +24,6 @@ from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnecte
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
TorqueMode,
apply_feetech_offsets_from_calibration,
run_full_arm_calibration,
)
@@ -34,7 +33,7 @@ from .configuration_so100 import SO100TeleopConfig
class SO100Teleop(Teleoperator):
"""
[SO-100 Leader Arm](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio
[SO-100 Arm](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio
"""
config_class = SO100TeleopConfig
@@ -44,16 +43,17 @@ class SO100Teleop(Teleoperator):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": (1, "sts3215"),
"shoulder_lift": (2, "sts3215"),
"elbow_flex": (3, "sts3215"),
"wrist_flex": (4, "sts3215"),
"wrist_roll": (5, "sts3215"),
"gripper": (6, "sts3215"),
"shoulder_pan": config.shoulder_pan,
"shoulder_lift": config.shoulder_lift,
"elbow_flex": config.elbow_flex,
"wrist_flex": config.wrist_flex,
"wrist_roll": config.wrist_roll,
"gripper": config.gripper,
},
)
@@ -96,21 +96,22 @@ class SO100Teleop(Teleoperator):
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
arm_calib_path = self.calibration_dir / f"{self.id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
logging.info(f"Missing calibration file '{arm_calib_path}'")
calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "leader")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)
apply_feetech_offsets_from_calibration(self.arm, calibration)
def get_action(self) -> np.ndarray:
"""The returned action does not have a batch dimension."""

View File

@@ -1,5 +1,6 @@
import abc
from typing import Any
import numpy as np
from lerobot.common.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
@@ -14,14 +15,12 @@ class Teleoperator(abc.ABC):
name: str
def __init__(self, config: TeleoperatorConfig):
self.id = config.id
self.calibration_dir = (
config.calibration_dir
if config.calibration_dir
else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name
)
self.calibration_dir.mkdir(parents=True, exist_ok=True)
self.calibration_fpath = self.calibration_dir / f"{self.id}.json"
@abc.abstractproperty
def action_feature(self) -> dict:
@@ -42,12 +41,12 @@ class Teleoperator(abc.ABC):
pass
@abc.abstractmethod
def get_action(self) -> dict[str, Any]:
def get_action(self) -> np.ndarray:
"""Gets the action to send to a teleoperator."""
pass
@abc.abstractmethod
def send_feedback(self, feedback: dict[str, Any]) -> None:
def send_feedback(self, feedback: np.ndarray) -> None:
"""Sends feedback captured from a robot to the teleoperator."""
pass

View File

@@ -43,6 +43,7 @@ class WidowXTeleop(Teleoperator):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.id = config.id
self.arm = DynamixelMotorsBus(
port=self.config.port,
@@ -119,17 +120,19 @@ class WidowXTeleop(Teleoperator):
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
arm_calib_path = self.calibration_dir / f"{self.id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
logging.info(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "leader")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)

View File

@@ -0,0 +1,126 @@
import argparse
import json
import time
from pathlib import Path
from lerobot.common.robot_devices.motors.feetech import (
FeetechMotorsBus,
adjusted_to_homing_ticks,
adjusted_to_motor_ticks,
convert_degrees_to_ticks,
convert_ticks_to_degrees,
)
# Replace this import with your real config class import
# (e.g., So100RobotConfig or any other)
from lerobot.common.robot_devices.robots.configs import So100RobotConfig
from lerobot.common.robot_devices.robots.utils import make_robot_from_config
def debug_feetech_positions(cfg, arm_arg: str):
"""
Reads each joints (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
:param cfg: A config object (e.g. So100RobotConfig).
:param arm_arg: One of "main_leader" or "main_follower".
"""
# 1) Make the Robot from your config
# e.g. `So100RobotConfig` might already be the "robot object",
# or if it is purely a config structure, do: robot = make_robot_from_config(cfg).
robot = make_robot_from_config(cfg)
# 2) Parse which arm we want: 'main_leader' or 'main_follower'
if arm_arg not in ("main_leader", "main_follower"):
raise ValueError("Please specify --arm=main_leader or --arm=main_follower")
bus_config = robot.leader_arms["main"] if arm_arg == "main_leader" else robot.follower_arms["main"]
# 3) Create the Feetech bus from that config
bus = FeetechMotorsBus(bus_config)
# 4) (Optional) Load calibration if it exists
calib_file = Path(robot.calibration_dir) / f"{arm_arg}.json"
if calib_file.exists():
with open(calib_file) as f:
calibration_dict = json.load(f)
bus.set_calibration(calibration_dict)
print(f"Loaded calibration from {calib_file}")
else:
print(f"No calibration file found at {calib_file}, skipping calibration set.")
# 5) Connect to the bus
bus.connect()
print(f"Connected to Feetech bus on port: {bus.port}")
# 6) Disable torque on all motors so you can move them freely by hand
bus.write("Torque_Enable", 0, motor_names=bus.motor_names)
print("Torque disabled on all joints.")
try:
print("\nPress Ctrl+C to quit.\n")
while True:
# (a) Read *raw* positions (no calibration)
raw_positions = bus.read_with_motor_ids(
bus.motor_models, bus.motor_indices, data_name="Present_Position"
)
# (b) Read *already-homed* positions (calibration is applied automatically)
homed_positions = bus.read("Present_Position")
# Print them side by side
for i, name in enumerate(bus.motor_names):
motor_idx, model = bus.motors[name]
raw_ticks = raw_positions[i] # 0..4095
homed_val = homed_positions[i] # degrees or % if linear
# If you want to see how offset is used inside bus.read(), do it manually:
offset = 0
if bus.calibration and name in bus.calibration["motor_names"]:
offset_idx = bus.calibration["motor_names"].index(name)
offset = bus.calibration["homing_offset"][offset_idx]
# Manually compute "adjusted ticks" from raw ticks
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, offset, model, bus, motor_idx)
# Convert to degrees
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
# Convert to ticks
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
# Invert
inv_ticks = adjusted_to_motor_ticks(manual_ticks, offset, model, bus, motor_idx)
print(
f"{name:15s} | "
f"RAW={raw_ticks:4d} | "
f"HOMED_FROM_READ={homed_val:7.2f} | "
f"HOMED_TICKS={manual_adjusted:6d} | "
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
f"INV_TICKS={inv_ticks:4d} "
)
print("----------------------------------------------------")
time.sleep(0.25) # slow down loop
except KeyboardInterrupt:
pass
finally:
print("\nExiting. Disconnecting bus...")
bus.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Debug Feetech positions.")
parser.add_argument(
"--arm",
type=str,
choices=["main_leader", "main_follower"],
default="main_leader",
help="Which arm to debug: 'main_leader' or 'main_follower'.",
)
args = parser.parse_args()
# 1) Instantiate the config you want (So100RobotConfig, or whichever your project uses).
cfg = So100RobotConfig()
# 2) Call the function with (cfg, args.arm)
debug_feetech_positions(cfg, arm_arg=args.arm)

View File

@@ -1,100 +0,0 @@
"""
usage:
```python
python lerobot/scripts/calibration_visualization_so100.py \
--teleop.type=so100 \
--teleop.port=/dev/tty.usbmodem58760430541
python lerobot/scripts/calibration_visualization_so100.py \
--robot.type=so100 \
--robot.port=/dev/tty.usbmodem585A0084711
```
"""
import time
from dataclasses import dataclass
import draccus
from lerobot.common.motors.feetech.feetech import (
adjusted_to_homing_ticks,
adjusted_to_motor_ticks,
convert_degrees_to_ticks,
convert_ticks_to_degrees,
)
from lerobot.common.robots import RobotConfig
from lerobot.common.robots.so100 import SO100Robot, SO100RobotConfig # noqa: F401
from lerobot.common.teleoperators import TeleoperatorConfig
from lerobot.common.teleoperators.so100 import SO100Teleop, SO100TeleopConfig # noqa: F401
@dataclass
class DebugFeetechConfig:
teleop: TeleoperatorConfig | None = None
robot: RobotConfig | None = None
def __post_init__(self):
if bool(self.teleop) == bool(self.robot):
raise ValueError("Select a single device.")
@draccus.wrap()
def debug_feetech_positions(cfg: DebugFeetechConfig):
"""
Reads each joint's (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
"""
device = SO100Teleop(cfg.teleop) if cfg.teleop else SO100Robot(cfg.robot)
device.connect()
# Disable torque on all motors so you can move them freely by hand
device.arm.write("Torque_Enable", 0, motor_names=device.arm.motor_names)
print("Torque disabled on all joints.")
try:
print("\nPress Ctrl+C to quit.\n")
while True:
# Read *raw* positions (no calibration).
raw_positions = device.arm.read_with_motor_ids(
device.arm.motor_models, device.arm.motor_indices, data_name="Present_Position"
)
# Read *already-homed* positions
homed_positions = device.arm.read("Present_Position")
for i, name in enumerate(device.arm.motor_names):
motor_idx, model = device.arm.motors[name]
raw_ticks = raw_positions[i] # 0..4095
homed_val = homed_positions[i] # degrees or % if linear
# Manually compute "adjusted ticks" from raw ticks
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, model, device.arm, motor_idx)
# Convert to degrees
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
# Convert that deg back to ticks
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
# Then invert them using offset & bus drive mode
inv_ticks = adjusted_to_motor_ticks(manual_ticks, model, device.arm, motor_idx)
print(
f"{name:15s} | "
f"RAW={raw_ticks:4d} | "
f"HOMED_FROM_READ={homed_val:7.2f} | "
f"HOMED_TICKS={manual_adjusted:6d} | "
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
f"INV_TICKS={inv_ticks:4d} "
)
print("----------------------------------------------------")
time.sleep(0.25)
except KeyboardInterrupt:
pass
finally:
print("\nExiting. Disconnecting bus...")
device.disconnect()
if __name__ == "__main__":
debug_feetech_positions()

View File

@@ -149,10 +149,15 @@ def configure_motor(port, brand, model, motor_idx_des, baudrate_des):
motor_bus.write("Min_Angle_Limit", 0) # default 0
motor_bus.write("Offset", 0)
motor_bus.write("Mode", 0)
motor_bus.write("Goal_Position", 2048)
motor_bus.write("Goal_Position", 0)
motor_bus.write("Torque_Enable", 0)
motor_bus.write("Lock", 1)
print("Offset", motor_bus.read("Offset"))
# TODO(pepijn): Add when doing a motor configuration, you instantainesly home the position (while assembling)
# single_calibration = single_motor_calibration(motor_bus, motor_idx_des)
# Then store single_calibration in yaml via dict (not overwrite but add the single calibration each time for motor id)
except Exception as e:
print(f"Error occurred during motor configuration: {e}")

View File

@@ -265,25 +265,13 @@ def main():
),
)
parser.add_argument(
"--tolerance-s",
type=float,
default=1e-4,
help=(
"Tolerance in seconds used to ensure data timestamps respect the dataset fps value"
"This is argument passed to the constructor of LeRobotDataset and maps to its tolerance_s constructor argument"
"If not given, defaults to 1e-4."
),
)
args = parser.parse_args()
kwargs = vars(args)
repo_id = kwargs.pop("repo_id")
root = kwargs.pop("root")
tolerance_s = kwargs.pop("tolerance_s")
logging.info("Loading dataset")
dataset = LeRobotDataset(repo_id, root=root, tolerance_s=tolerance_s)
dataset = LeRobotDataset(repo_id, root=root)
visualize_dataset(dataset, **vars(args))

View File

@@ -446,31 +446,15 @@ def main():
help="Delete the output directory if it exists already.",
)
parser.add_argument(
"--tolerance-s",
type=float,
default=1e-4,
help=(
"Tolerance in seconds used to ensure data timestamps respect the dataset fps value"
"This is argument passed to the constructor of LeRobotDataset and maps to its tolerance_s constructor argument"
"If not given, defaults to 1e-4."
),
)
args = parser.parse_args()
kwargs = vars(args)
repo_id = kwargs.pop("repo_id")
load_from_hf_hub = kwargs.pop("load_from_hf_hub")
root = kwargs.pop("root")
tolerance_s = kwargs.pop("tolerance_s")
dataset = None
if repo_id:
dataset = (
LeRobotDataset(repo_id, root=root, tolerance_s=tolerance_s)
if not load_from_hf_hub
else get_dataset_info(repo_id)
)
dataset = LeRobotDataset(repo_id, root=root) if not load_from_hf_hub else get_dataset_info(repo_id)
visualize_dataset_html(dataset, **vars(args))

Binary file not shown.

Before

Width:  |  Height:  |  Size: 509 KiB

After

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 528 KiB

After

Width:  |  Height:  |  Size: 86 KiB

View File

@@ -56,6 +56,7 @@ dependencies = [
"gymnasium==0.29.1", # TODO(rcadene, aliberts): Make gym 1.0.0 work
"h5py>=3.10.0",
"huggingface-hub[hf-transfer,cli]>=0.27.1 ; python_version < '4.0'",
"hydra-core>=1.3.2",
"imageio[ffmpeg]>=2.34.0",
"jsonlines>=4.0.0",
"numba>=0.59.0",
@@ -102,7 +103,30 @@ requires-poetry = ">=2.1"
[tool.ruff]
line-length = 110
target-version = "py310"
exclude = ["tests/artifacts/**/*.safetensors"]
exclude = [
"tests/data",
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".mypy_cache",
".nox",
".pants.d",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"venv",
]
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "I", "N", "B", "C4", "SIM"]

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from huggingface_hub import DatasetCard
from lerobot.common.datasets.utils import create_lerobot_dataset_card
def test_default_parameters():
card = create_lerobot_dataset_card()
assert isinstance(card, DatasetCard)
assert card.data.tags == ["LeRobot"]
assert card.data.task_categories == ["robotics"]
assert card.data.configs == [
{
"config_name": "default",
"data_files": "data/*/*.parquet",
}
]
def test_with_tags():
tags = ["tag1", "tag2"]
card = create_lerobot_dataset_card(tags=tags)
assert card.data.tags == ["LeRobot", "tag1", "tag2"]

View File

@@ -23,7 +23,7 @@ If you know that your change will break backward compatibility, you should write
doesnt need to be merged into the `main` branch. Then you need to run this script and update the tests artifacts.
Example usage:
`python tests/artifacts/datasets/save_dataset_to_safetensors.py`
`python tests/scripts/save_dataset_to_safetensors.py`
"""
import shutil
@@ -88,4 +88,4 @@ if __name__ == "__main__":
"lerobot/nyu_franka_play_dataset",
"lerobot/cmu_stretch",
]:
save_dataset_to_safetensors("tests/artifacts/datasets", repo_id=dataset)
save_dataset_to_safetensors("tests/data/save_dataset_to_safetensors", repo_id=dataset)

View File

@@ -27,7 +27,7 @@ from lerobot.common.datasets.transforms import (
)
from lerobot.common.utils.random_utils import seeded_context
ARTIFACT_DIR = Path("tests/artifacts/image_transforms")
ARTIFACT_DIR = Path("tests/data/save_image_transforms_to_safetensors")
DATASET_REPO_ID = "lerobot/aloha_mobile_shrimp"

View File

@@ -141,5 +141,5 @@ if __name__ == "__main__":
raise RuntimeError("No policies were provided!")
for ds_repo_id, policy, policy_kwargs, file_name_extra in artifacts_cfg:
ds_name = ds_repo_id.split("/")[-1]
output_dir = Path("tests/artifacts/policies") / f"{ds_name}_{policy}_{file_name_extra}"
output_dir = Path("tests/data/save_policy_to_safetensors") / f"{ds_name}_{policy}_{file_name_extra}"
save_policy_to_safetensors(output_dir, ds_repo_id, policy, policy_kwargs)

View File

@@ -146,7 +146,7 @@ def test_camera(request, camera_type, mock):
camera.connect()
if mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2

View File

@@ -51,7 +51,7 @@ from lerobot.configs.control import (
)
from lerobot.configs.policies import PreTrainedConfig
from lerobot.scripts.control_robot import calibrate, record, replay, teleoperate
from tests.robots.test_robots import make_robot
from tests.test_robots import make_robot
from tests.utils import TEST_ROBOT_TYPES, mock_calibration_dir, require_robot

View File

@@ -473,12 +473,12 @@ def test_flatten_unflatten_dict():
)
@require_x86_64_kernel
def test_backward_compatibility(repo_id):
"""The artifacts for this test have been generated by `tests/artifacts/datasets/save_dataset_to_safetensors.py`."""
"""The artifacts for this test have been generated by `tests/scripts/save_dataset_to_safetensors.py`."""
# TODO(rcadene, aliberts): remove dataset download
dataset = LeRobotDataset(repo_id, episodes=[0])
test_dir = Path("tests/artifacts/datasets") / repo_id
test_dir = Path("tests/data/save_dataset_to_safetensors") / repo_id
def load_and_compare(i):
new_frame = dataset[i] # noqa: B023

View File

@@ -23,7 +23,8 @@ from gymnasium.utils.env_checker import check_env
import lerobot
from lerobot.common.envs.factory import make_env, make_env_config
from lerobot.common.envs.utils import preprocess_observation
from tests.utils import require_env
from .utils import require_env
OBS_TYPES = ["state", "pixels", "pixels_agent_pos"]

Some files were not shown because too many files have changed in this diff Show More