Merge branch 'user/aliberts/2025_02_25_refactor_robots' of github.com:huggingface/lerobot into user/aliberts/2025_02_25_refactor_robots

This commit is contained in:
Simon Alibert
2025-03-25 16:24:40 +01:00
9 changed files with 372 additions and 467 deletions

View File

@@ -1,4 +1,29 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import select
import sys
import time
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from .dynamixel.dynamixel import DynamixelMotorsBus
from .feetech.feetech import FeetechMotorsBus
from .motors_bus import MotorsBus
@dataclass @dataclass
@@ -8,3 +33,146 @@ class MotorCalibration:
homing_offset: int homing_offset: int
range_min: int range_min: int
range_max: int range_max: int
def find_offset(motorbus: MotorsBus):
input("Move robot to the middle of its range of motion and press ENTER....")
for name in motorbus.names:
# Also reset to defaults
if isinstance(motorbus, FeetechMotorsBus):
motorbus.write("Offset", name, 0, raw_value=True)
motorbus.write("Min_Angle_Limit", name, 0, raw_value=True)
motorbus.write("Max_Angle_Limit", name, 4095, raw_value=True)
elif isinstance(motorbus, DynamixelMotorsBus):
motorbus.write("Homing_Offset", name, 0, raw_value=True)
motorbus.write("Min_Position_Limit", name, 0, raw_value=True)
motorbus.write("Max_Position_Limit", name, 4095, raw_value=True)
else:
raise ValueError("Motorbus instance is unknown")
middle_values = motorbus.sync_read("Present_Position", raw_values=True)
offsets = {}
for name, pos in middle_values.items():
offset = pos - 2047 # Center the middle reading at 2047.
MotorCalibration.set_offset(motorbus, offset, name)
offsets[name] = offset
return offsets
def find_min_max(motorbus: MotorsBus):
print("Move all joints (except wrist_roll; id = 5) sequentially through their entire ranges of motion.")
print("Recording positions. Press ENTER to stop...")
recorded_data = {name: [] for name in motorbus.names}
while True:
positions = motorbus.sync_read("Present_Position", raw_values=True)
for name in motorbus.names:
recorded_data[name].append(positions[name])
time.sleep(0.01)
# Check if user pressed Enter
ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
if ready_to_read:
line = sys.stdin.readline()
if line.strip() == "":
break
min_max = {}
for name in motorbus.names:
motor_values = recorded_data[name]
raw_min = min(motor_values)
raw_max = max(motor_values)
if name == "wrist_roll":
physical_min = 0
physical_max = 4095
else:
physical_min = int(raw_min)
physical_max = int(raw_max)
MotorCalibration.set_min_max(motorbus, physical_min, physical_max, name)
min_max[name] = {"min": physical_min, "max": physical_max}
return min_max
def set_calibration(motorbus: MotorsBus, calibration_fpath: Path) -> None:
with open(calibration_fpath) as f:
calibration = json.load(f)
motorbus.calibration = {int(id_): val for id_, val in calibration.items()}
for _, cal_data in motorbus.calibration.items():
name = cal_data.get("name")
if name not in motorbus.names:
print(f"Motor name '{name}' from calibration not found in arm names.")
continue
MotorCalibration.set_offset(motorbus, cal_data["homing_offset"], name)
MotorCalibration.set_min_max(motorbus, cal_data["min"], cal_data["max"], name)
def set_offset(motorbus: MotorsBus, homing_offset: int, name: str):
homing_offset = int(homing_offset)
# Clamp to [-2047..+2047]
if homing_offset > 2047:
homing_offset = 2047
print(
f"Warning: '{homing_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif homing_offset < -2047:
homing_offset = -2047
print(
f"Warning: '{homing_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
direction_bit = 1 if homing_offset < 0 else 0 # Determine the direction (sign) bit and magnitude
magnitude = abs(homing_offset)
servo_offset = (
direction_bit << 11
) | magnitude # Combine sign bit (bit 11) with the magnitude (bits 0..10)
if isinstance(motorbus, FeetechMotorsBus):
motorbus.write("Offset", name, servo_offset, raw_value=True)
read_offset = motorbus.sync_read("Offset", name, raw_values=True)
elif isinstance(motorbus, DynamixelMotorsBus):
motorbus.write("Homing_Offset", name, servo_offset, raw_value=True)
read_offset = motorbus.sync_read("Homing_Offset", name, raw_values=True)
else:
raise ValueError("Motorbus instance is unknown")
if read_offset[name] != servo_offset:
raise ValueError(
f"Offset not set correctly for motor '{name}'. read: {read_offset} does not equal {servo_offset}"
)
def set_min_max(motorbus: MotorsBus, min: int, max: int, name: str):
if isinstance(motorbus, FeetechMotorsBus):
motorbus.write("Min_Angle_Limit", name, min, raw_value=True)
motorbus.write("Max_Angle_Limit", name, max, raw_value=True)
read_min = motorbus.sync_read("Min_Angle_Limit", name, raw_values=True)
read_max = motorbus.sync_read("Max_Angle_Limit", name, raw_values=True)
elif isinstance(motorbus, DynamixelMotorsBus):
motorbus.write("Min_Position_Limit", name, min, raw_value=True)
motorbus.write("Max_Position_Limit", name, max, raw_value=True)
read_min = motorbus.sync_read("Min_Position_Limit", name, raw_values=True)
read_max = motorbus.sync_read("Max_Position_Limit", name, raw_values=True)
else:
raise ValueError("Motorbus instance is unknown")
if read_min[name] != min:
raise ValueError(
f"Min is not set correctly for motor '{name}'. read: {read_min} does not equal {min}"
)
if read_max[name] != max:
raise ValueError(
f"Max is not set correctly for motor '{name}'. read: {read_max} does not equal {max}"
)

View File

@@ -1,3 +1,2 @@
from .feetech import DriveMode, FeetechMotorsBus, OperatingMode, TorqueMode from .feetech import DriveMode, FeetechMotorsBus, OperatingMode, TorqueMode
from .feetech_calibration import apply_feetech_offsets_from_calibration, run_full_arm_calibration
from .tables import * from .tables import *

View File

@@ -1,313 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from ..motors_bus import (
CalibrationMode,
MotorsBus,
)
from .feetech import FeetechMotorsBus, TorqueMode
from .tables import MODEL_RESOLUTION
URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
)
def disable_torque(arm: MotorsBus):
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run calibration, the torque must be disabled on all motors.")
def get_calibration_modes(arm: MotorsBus):
"""Returns calibration modes for each motor (DEGREE for rotational, LINEAR for gripper)."""
return [
CalibrationMode.LINEAR.name if name == "gripper" else CalibrationMode.DEGREE.name
for name in arm.names
]
def reset_offset(motor_id, motor_bus):
# Open the write lock, changes to EEPROM do NOT persist yet
motor_bus.write("Lock", 1)
# Set offset to 0
motor_name = motor_bus.motor_names[motor_id - 1]
motor_bus.write("Offset", 0, motor_names=[motor_name])
# Close the write lock, changes to EEPROM do persist
motor_bus.write("Lock", 0)
# Confirm that the offset is zero by reading it back
confirmed_offset = motor_bus.read("Offset")[motor_id - 1]
print(f"Offset for motor {motor_id} reset to: {confirmed_offset}")
return confirmed_offset
def calibrate_homing_motor(motor_id, motor_bus):
reset_offset(motor_id, motor_bus)
home_ticks = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts at 0
print(f"Encoder offset (present position in homing position): {home_ticks}")
return home_ticks
def calibrate_linear_motor(motor_id, motor_bus):
motor_names = motor_bus.motor_names
motor_name = motor_names[motor_id - 1]
reset_offset(motor_id, motor_bus)
input(f"Close the {motor_name}, then press Enter...")
start_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
print(f" [Motor {motor_id}] start position recorded: {start_pos}")
input(f"Open the {motor_name} fully, then press Enter...")
end_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
print(f" [Motor {motor_id}] end position recorded: {end_pos}")
return start_pos, end_pos
def single_motor_calibration(arm: MotorsBus, motor_id: int):
"""Calibrates a single motor and returns its calibration data for updating the calibration file."""
disable_torque(arm)
print(f"\n--- Calibrating Motor {motor_id} ---")
start_pos = 0
end_pos = 0
encoder_offset = 0
if motor_id == 6:
start_pos, end_pos = calibrate_linear_motor(motor_id, arm)
else:
input("Move the motor to (zero) position, then press Enter...")
encoder_offset = calibrate_homing_motor(motor_id, arm)
print(f"Calibration for motor ID:{motor_id} done.")
# Create a calibration dictionary for the single motor
calib_dict = {
"homing_offset": int(encoder_offset),
"drive_mode": 0,
"start_pos": int(start_pos),
"end_pos": int(end_pos),
"calib_mode": get_calibration_modes(arm)[motor_id - 1],
"motor_name": arm.names[motor_id - 1],
}
return calib_dict
def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""
Runs a full calibration process for all motors in a robotic arm.
This function calibrates each motor in the arm, determining encoder offsets and
start/end positions for linear and rotational motors. The calibration data is then
stored in a dictionary for later use.
**Calibration Process:**
- The user is prompted to move the arm to its homing position before starting.
- Motors with rotational motion are calibrated using a homing method.
- Linear actuators (e.g., grippers) are calibrated separately.
- Encoder offsets, start positions, and end positions are recorded.
**Example Usage:**
```python
run_full_arm_calibration(arm, "so100", "left", "follower")
```
"""
disable_torque(arm)
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
print("\nMove arm to homing position (middle)")
print(
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero")
) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial
input("Press Enter to continue...")
start_positions = np.zeros(len(arm.ids))
end_positions = np.zeros(len(arm.ids))
encoder_offsets = np.zeros(len(arm.ids))
modes = get_calibration_modes(arm)
for i, motor_id in enumerate(arm.ids):
if modes[i] == CalibrationMode.DEGREE.name:
encoder_offsets[i] = calibrate_homing_motor(motor_id, arm)
start_positions[i] = 0
end_positions[i] = 0
for i, motor_id in enumerate(arm.ids):
if modes[i] == CalibrationMode.LINEAR.name:
start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm)
encoder_offsets[i] = 0
print("\nMove arm to rest position")
input("Press Enter to continue...")
print(f"\n calibration of {robot_type} {arm_name} {arm_type} done!")
# Force drive_mode values (can be static)
drive_modes = [0, 1, 0, 0, 1, 0]
calib_dict = {
"homing_offset": encoder_offsets.astype(int).tolist(),
"drive_mode": drive_modes,
"start_pos": start_positions.astype(int).tolist(),
"end_pos": end_positions.astype(int).tolist(),
"calib_mode": get_calibration_modes(arm),
"motor_names": arm.names,
}
return calib_dict
def run_full_auto_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""TODO(pepijn): Add this method later as extra
Example of usage:
```python
run_full_auto_arm_calibration(arm, "so100", "left", "follower")
```
"""
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
def convert_ticks_to_degrees(ticks, model):
resolutions = MODEL_RESOLUTION[model]
# Convert the ticks to degrees
return ticks * (360.0 / resolutions)
def convert_degrees_to_ticks(degrees, model):
resolutions = MODEL_RESOLUTION[model]
# Convert degrees to motor ticks
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
ticks -= resolutions
# Flip sign if drive_mode is set.
drive_mode = 0
if motor_bus.calibration is not None:
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
ticks *= -1
return ticks
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
"""
# Flip sign if drive_mode was set.
drive_mode = 0
if motor_bus.calibration is not None:
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
adjusted_pos *= -1
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
return ticks

View File

@@ -20,12 +20,10 @@
# https://github.com/astral-sh/ruff/issues/3711 # https://github.com/astral-sh/ruff/issues/3711
import abc import abc
import json
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from functools import cached_property from functools import cached_property
from pathlib import Path
from pprint import pformat from pprint import pformat
from typing import Protocol, TypeAlias, overload from typing import Protocol, TypeAlias, overload
@@ -414,12 +412,6 @@ class MotorsBus(abc.ABC):
logger.error(e) logger.error(e)
return False return False
def set_calibration(self, calibration_fpath: Path) -> None:
with open(calibration_fpath) as f:
calibration = json.load(f)
self.calibration = {int(id_): val for id_, val in calibration.items()}
@abc.abstractmethod @abc.abstractmethod
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
pass pass

View File

@@ -149,6 +149,7 @@ class KochRobot(Robot):
self.is_connected = True self.is_connected = True
def calibrate(self) -> None: def calibrate(self) -> None:
# TODO(pepijn): Do calibration in same way as so100
"""After calibration all motors function in human interpretable ranges. """After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180], Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100]. and linear motions (like gripper of Aloha) in nominal range of [0, 100].

View File

@@ -18,7 +18,6 @@ and send orders to its motors.
# TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated # TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated
# calibration procedure, to make it easy for people to add their own robot. # calibration procedure, to make it easy for people to add their own robot.
import json
import time import time
import warnings import warnings
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -81,73 +80,6 @@ class ManipulatorRobotConfig(RobotConfig):
) )
def apply_feetech_offsets_from_calibration(motorsbus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
class ManipulatorRobot: class ManipulatorRobot:
# TODO(rcadene): Implement force feedback # TODO(rcadene): Implement force feedback
"""This class allows to control any manipulator robot of various number of motors. """This class allows to control any manipulator robot of various number of motors.
@@ -347,8 +279,6 @@ class ManipulatorRobot:
for name in self.leader_arms: for name in self.leader_arms:
self.leader_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value) self.leader_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value)
self.activate_calibration()
# Set robot preset (e.g. torque in leader gripper for Koch v1.1) # Set robot preset (e.g. torque in leader gripper for Koch v1.1)
if self.robot_type in ["koch", "koch_bimanual"]: if self.robot_type in ["koch", "koch_bimanual"]:
self.set_koch_robot_preset() self.set_koch_robot_preset()
@@ -385,61 +315,6 @@ class ManipulatorRobot:
self.is_connected = True self.is_connected = True
def activate_calibration(self):
"""After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
def load_or_run_calibration_(name, arm, arm_type):
arm_id = get_arm_id(name, arm_type)
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
print(f"Missing calibration file '{arm_calib_path}'")
if self.robot_type in ["koch", "koch_bimanual", "aloha"]:
from lerobot.common.motors.dynamixel.dynamixel_calibration import run_arm_calibration
calibration = run_arm_calibration(arm, self.robot_type, name, arm_type)
elif self.robot_type in ["so100", "moss", "lekiwi"]:
from lerobot.common.motors.feetech.feetech_calibration import (
run_full_arm_calibration,
)
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
return calibration
# For each follower arm
for name, arm_bus in self.follower_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "follower")
arm_bus.set_calibration(calibration)
# If this is a Feetech robot, also set the servo offset into EEPROM
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
# For each leader arm
for name, arm_bus in self.leader_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "leader")
arm_bus.set_calibration(calibration)
# Optionally also set offset for leader if you want the servo offsets as well
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
def set_koch_robot_preset(self): def set_koch_robot_preset(self):
def set_operating_mode_(arm): def set_operating_mode_(arm):
from lerobot.common.motors.dynamixel.dynamixel import TorqueMode from lerobot.common.motors.dynamixel.dynamixel import TorqueMode
@@ -540,9 +415,6 @@ class ManipulatorRobot:
# Set I_Coefficient and D_Coefficient to default value 0 and 32 # Set I_Coefficient and D_Coefficient to default value 0 and 32
self.follower_arms[name].write("I_Coefficient", 0) self.follower_arms[name].write("I_Coefficient", 0)
self.follower_arms[name].write("D_Coefficient", 32) self.follower_arms[name].write("D_Coefficient", 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.follower_arms[name].write("Lock", 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table # the motors. Note: this configuration is not in the official STS3215 Memory Table
self.follower_arms[name].write("Maximum_Acceleration", 254) self.follower_arms[name].write("Maximum_Acceleration", 254)

View File

@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import time import time
@@ -23,11 +24,11 @@ from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor from lerobot.common.motors import CalibrationMode, Motor
from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration
from lerobot.common.motors.feetech import ( from lerobot.common.motors.feetech import (
FeetechMotorsBus, FeetechMotorsBus,
OperatingMode, OperatingMode,
TorqueMode, TorqueMode,
apply_feetech_offsets_from_calibration,
) )
from ..robot import Robot from ..robot import Robot
@@ -97,15 +98,17 @@ class SO100Robot(Robot):
# Set I_Coefficient and D_Coefficient to default value 0 and 32 # Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", name, 0) self.arm.write("I_Coefficient", name, 0)
self.arm.write("D_Coefficient", name, 32) self.arm.write("D_Coefficient", name, 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.arm.write("Lock", name, 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table # the motors. Note: this configuration is not in the official STS3215 Memory Table
self.arm.write("Maximum_Acceleration", name, 254) self.arm.write("Maximum_Acceleration", name, 254)
self.arm.write("Acceleration", name, 254) self.arm.write("Acceleration", name, 254)
self.calibrate() if not self.calibration_fpath.exists():
print("Calibration file not found. Running calibration.")
self.calibrate()
else:
print("Calibration file found. Loading calibration data.")
set_calibration(self.arm, self.calibration_fpath)
for name in self.arm.names: for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value)
@@ -135,15 +138,25 @@ class SO100Robot(Robot):
cam.connect() cam.connect()
def calibrate(self) -> None: def calibrate(self) -> None:
raise NotImplementedError print(f"\nRunning calibration of {self.name} robot")
def set_calibration(self) -> None: offsets = find_offset(self.arm)
if not self.calibration_fpath.exists(): min_max = find_min_max(self.arm)
logging.error("Calibration file not found. Please run calibration first")
raise FileNotFoundError(self.calibration_fpath)
self.arm.set_calibration(self.calibration_fpath) calibration = {}
apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration) for name in self.arm.names:
motor_id = self.arm.motors[name].id
calibration[str(motor_id)] = {
"name": name,
"homing_offset": offsets.get(name, 0),
"drive_mode": 0,
"min": min_max[name]["min"],
"max": min_max[name]["max"],
}
with open(self.calibration_fpath, "w") as f:
json.dump(calibration, f, indent=4)
print("Calibration saved to", self.calibration_fpath)
def get_observation(self) -> dict[str, np.ndarray]: def get_observation(self) -> dict[str, np.ndarray]:
"""The returned observations do not have a batch dimension.""" """The returned observations do not have a batch dimension."""

View File

@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import time import time
@@ -21,11 +22,11 @@ import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor from lerobot.common.motors import CalibrationMode, Motor
from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration
from lerobot.common.motors.feetech import ( from lerobot.common.motors.feetech import (
FeetechMotorsBus, FeetechMotorsBus,
TorqueMode,
apply_feetech_offsets_from_calibration,
) )
from lerobot.common.motors.feetech.feetech import TorqueMode
from ..teleoperator import Teleoperator from ..teleoperator import Teleoperator
from .configuration_so100 import SO100TeleopConfig from .configuration_so100 import SO100TeleopConfig
@@ -71,6 +72,13 @@ class SO100Teleop(Teleoperator):
return {} return {}
def configure(self) -> None: def configure(self) -> None:
if not self.calibration_fpath.exists():
print("Calibration file not found. Running calibration.")
self.calibrate()
else:
print("Calibration file found. Loading calibration data.")
set_calibration(self.arm, self.calibration_fpath)
# We assume that at connection time, arm is in a rest position, # We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration. # and torque can be safely disabled to run calibration.
for name in self.arm.names: for name in self.arm.names:
@@ -89,13 +97,32 @@ class SO100Teleop(Teleoperator):
logging.info("Connecting arm.") logging.info("Connecting arm.")
self.arm.connect() self.arm.connect()
self.configure() self.configure()
self.calibrate()
# Check arm can be read # Check arm can be read
self.arm.sync_read("Present_Position") self.arm.sync_read("Present_Position")
def calibrate(self) -> None: def calibrate(self) -> None:
raise NotImplementedError print(f"\nRunning calibration of {self.name} teleop")
for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
offsets = find_offset(self.arm)
min_max = find_min_max(self.arm)
calibration = {}
for name in self.arm.names:
motor_id = self.arm.motors[name].id
calibration[str(motor_id)] = {
"name": name,
"homing_offset": offsets.get(name, 0),
"drive_mode": 0,
"min": min_max[name]["min"],
"max": min_max[name]["max"],
}
with open(self.calibration_fpath, "w") as f:
json.dump(calibration, f, indent=4)
print("Calibration saved to", self.calibration_fpath)
def set_calibration(self) -> None: def set_calibration(self) -> None:
"""After calibration all motors function in human interpretable ranges. """After calibration all motors function in human interpretable ranges.
@@ -107,7 +134,6 @@ class SO100Teleop(Teleoperator):
raise FileNotFoundError(self.calibration_fpath) raise FileNotFoundError(self.calibration_fpath)
self.arm.set_calibration(self.calibration_fpath) self.arm.set_calibration(self.calibration_fpath)
apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration)
def get_action(self) -> np.ndarray: def get_action(self) -> np.ndarray:
"""The returned action does not have a batch dimension.""" """The returned action does not have a batch dimension."""

View File

@@ -0,0 +1,147 @@
import sys
from typing import Generator
from unittest.mock import Mock, call, patch
import pytest
import scservo_sdk as scs
from lerobot.common.motors import CalibrationMode, Motor
from lerobot.common.motors.calibration import find_min_max, find_offset, set_min_max, set_offset
from lerobot.common.motors.feetech import FeetechMotorsBus
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
@pytest.fixture(autouse=True)
def patch_port_handler():
if sys.platform == "darwin":
with patch.object(scs, "PortHandler", MockPortHandler):
yield
else:
yield
@pytest.fixture
def mock_motors() -> Generator[MockMotors, None, None]:
motors = MockMotors()
motors.open()
yield motors
motors.close()
@pytest.fixture
def dummy_motors() -> dict[str, Motor]:
return {
"dummy_1": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_roll": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100),
"dummy_3": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100),
}
@pytest.fixture(autouse=True)
def patch_broadcast_ping():
with patch.object(FeetechMotorsBus, "broadcast_ping", return_value={1: 777, 2: 777, 3: 777}):
yield
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
def test_autouse_patch():
"""Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
assert scs.PortHandler is MockPortHandler
@pytest.mark.parametrize(
"motor_names, read_values",
[
(
["dummy_1"],
[{"dummy_1": 3000}],
),
],
ids=["two-motors"],
)
def test_find_offset(mock_motors, dummy_motors, motor_names, read_values):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
with (
patch("builtins.input", return_value=""),
patch("lerobot.common.motors.calibration.set_offset") as mock_set_offset,
):
motors_bus.sync_read = Mock(side_effect=[{"dummy_1": 3000}])
motors_bus.motor_names = motor_names
motors_bus.write = Mock(return_value=None)
find_offset(motors_bus)
# Compute the expected offset: 3000 - 2047 = 953.
expected_calls = [call(motors_bus, 953, "dummy_1")]
mock_set_offset.assert_has_calls(expected_calls, any_order=False)
def test_find_min_max(mock_motors, dummy_motors):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.motor_names = list(dummy_motors.keys())
read_side_effect = [
{"dummy_1": 10, "wrist_roll": 20, "dummy_3": 30}, # For first sync_read call.
{"dummy_1": 4000, "wrist_roll": 2000, "dummy_3": 100}, # For second sync_read call.
{"dummy_1": 100, "wrist_roll": 4050, "dummy_3": 2010}, # For third sync_read call.
]
motors_bus.sync_read = Mock(side_effect=read_side_effect)
select_returns = [
([], [], []), # First iteration: no input.
([], [], []), # Second iteration.
([sys.stdin], [], []), # Third iteration: simulate pressing ENTER.
]
with (
patch("lerobot.common.motors.calibration.set_min_max") as mock_set_min_max,
patch("lerobot.common.motors.calibration.select.select", side_effect=select_returns),
patch("sys.stdin.readline", return_value="\n"),
):
find_min_max(motors_bus)
mock_set_min_max.assert_any_call(motors_bus, 10, 4000, "dummy_1")
mock_set_min_max.assert_any_call(motors_bus, 0, 4095, "wrist_roll") # wrist_roll is forced to [0,4095]
mock_set_min_max.assert_any_call(motors_bus, 30, 2010, "dummy_3")
assert mock_set_min_max.call_count == 3
def test_set_offset_clamping(mock_motors, dummy_motors):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.sync_read = Mock(return_value={"dummy_1": 2047})
motors_bus.write = Mock()
# A very large offset should be clamped to +2047.
set_offset(motors_bus, 9999, "dummy_1")
motors_bus.write.assert_any_call("Offset", "dummy_1", 2047, raw_value=True)
def test_set_min_max(mock_motors, dummy_motors):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
def _sync_read_side_effect(data_name, motors, *, raw_values=False):
if data_name == "Min_Angle_Limit":
return {"dummy_1": 100}
elif data_name == "Max_Angle_Limit":
return {"dummy_1": 3000}
return {}
motors_bus.sync_read = Mock(side_effect=_sync_read_side_effect)
motors_bus.write = Mock()
set_min_max(motors_bus, 100, 3000, "dummy_1")
motors_bus.write.assert_any_call("Min_Angle_Limit", "dummy_1", 100, raw_value=True)
motors_bus.write.assert_any_call("Max_Angle_Limit", "dummy_1", 3000, raw_value=True)