diff --git a/lerobot/common/motors/calibration.py b/lerobot/common/motors/calibration.py index f503a972..8329092f 100644 --- a/lerobot/common/motors/calibration.py +++ b/lerobot/common/motors/calibration.py @@ -1,4 +1,29 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import select +import sys +import time from dataclasses import dataclass +from pathlib import Path + +from .dynamixel.dynamixel import DynamixelMotorsBus +from .feetech.feetech import FeetechMotorsBus +from .motors_bus import MotorsBus @dataclass @@ -8,3 +33,146 @@ class MotorCalibration: homing_offset: int range_min: int range_max: int + + +def find_offset(motorbus: MotorsBus): + input("Move robot to the middle of its range of motion and press ENTER....") + for name in motorbus.names: + # Also reset to defaults + if isinstance(motorbus, FeetechMotorsBus): + motorbus.write("Offset", name, 0, raw_value=True) + motorbus.write("Min_Angle_Limit", name, 0, raw_value=True) + motorbus.write("Max_Angle_Limit", name, 4095, raw_value=True) + elif isinstance(motorbus, DynamixelMotorsBus): + motorbus.write("Homing_Offset", name, 0, raw_value=True) + motorbus.write("Min_Position_Limit", name, 0, raw_value=True) + motorbus.write("Max_Position_Limit", name, 4095, raw_value=True) + else: + raise ValueError("Motorbus instance is unknown") + + middle_values = motorbus.sync_read("Present_Position", raw_values=True) + + offsets = {} + for name, pos in middle_values.items(): + offset = pos - 2047 # Center the middle reading at 2047. + MotorCalibration.set_offset(motorbus, offset, name) + offsets[name] = offset + + return offsets + + +def find_min_max(motorbus: MotorsBus): + print("Move all joints (except wrist_roll; id = 5) sequentially through their entire ranges of motion.") + print("Recording positions. Press ENTER to stop...") + + recorded_data = {name: [] for name in motorbus.names} + + while True: + positions = motorbus.sync_read("Present_Position", raw_values=True) + for name in motorbus.names: + recorded_data[name].append(positions[name]) + time.sleep(0.01) + + # Check if user pressed Enter + ready_to_read, _, _ = select.select([sys.stdin], [], [], 0) + if ready_to_read: + line = sys.stdin.readline() + if line.strip() == "": + break + + min_max = {} + for name in motorbus.names: + motor_values = recorded_data[name] + raw_min = min(motor_values) + raw_max = max(motor_values) + + if name == "wrist_roll": + physical_min = 0 + physical_max = 4095 + else: + physical_min = int(raw_min) + physical_max = int(raw_max) + + MotorCalibration.set_min_max(motorbus, physical_min, physical_max, name) + min_max[name] = {"min": physical_min, "max": physical_max} + + return min_max + + +def set_calibration(motorbus: MotorsBus, calibration_fpath: Path) -> None: + with open(calibration_fpath) as f: + calibration = json.load(f) + + motorbus.calibration = {int(id_): val for id_, val in calibration.items()} + + for _, cal_data in motorbus.calibration.items(): + name = cal_data.get("name") + if name not in motorbus.names: + print(f"Motor name '{name}' from calibration not found in arm names.") + continue + + MotorCalibration.set_offset(motorbus, cal_data["homing_offset"], name) + MotorCalibration.set_min_max(motorbus, cal_data["min"], cal_data["max"], name) + + +def set_offset(motorbus: MotorsBus, homing_offset: int, name: str): + homing_offset = int(homing_offset) + + # Clamp to [-2047..+2047] + if homing_offset > 2047: + homing_offset = 2047 + print( + f"Warning: '{homing_offset}' is getting clamped because its larger then 2047; This should not happen!" + ) + elif homing_offset < -2047: + homing_offset = -2047 + print( + f"Warning: '{homing_offset}' is getting clamped because its smaller then -2047; This should not happen!" + ) + + direction_bit = 1 if homing_offset < 0 else 0 # Determine the direction (sign) bit and magnitude + magnitude = abs(homing_offset) + servo_offset = ( + direction_bit << 11 + ) | magnitude # Combine sign bit (bit 11) with the magnitude (bits 0..10) + + if isinstance(motorbus, FeetechMotorsBus): + motorbus.write("Offset", name, servo_offset, raw_value=True) + read_offset = motorbus.sync_read("Offset", name, raw_values=True) + elif isinstance(motorbus, DynamixelMotorsBus): + motorbus.write("Homing_Offset", name, servo_offset, raw_value=True) + read_offset = motorbus.sync_read("Homing_Offset", name, raw_values=True) + else: + raise ValueError("Motorbus instance is unknown") + + if read_offset[name] != servo_offset: + raise ValueError( + f"Offset not set correctly for motor '{name}'. read: {read_offset} does not equal {servo_offset}" + ) + + +def set_min_max(motorbus: MotorsBus, min: int, max: int, name: str): + if isinstance(motorbus, FeetechMotorsBus): + motorbus.write("Min_Angle_Limit", name, min, raw_value=True) + motorbus.write("Max_Angle_Limit", name, max, raw_value=True) + + read_min = motorbus.sync_read("Min_Angle_Limit", name, raw_values=True) + read_max = motorbus.sync_read("Max_Angle_Limit", name, raw_values=True) + elif isinstance(motorbus, DynamixelMotorsBus): + motorbus.write("Min_Position_Limit", name, min, raw_value=True) + motorbus.write("Max_Position_Limit", name, max, raw_value=True) + + read_min = motorbus.sync_read("Min_Position_Limit", name, raw_values=True) + read_max = motorbus.sync_read("Max_Position_Limit", name, raw_values=True) + else: + raise ValueError("Motorbus instance is unknown") + + if read_min[name] != min: + raise ValueError( + f"Min is not set correctly for motor '{name}'. read: {read_min} does not equal {min}" + ) + + if read_max[name] != max: + raise ValueError( + f"Max is not set correctly for motor '{name}'. read: {read_max} does not equal {max}" + ) diff --git a/lerobot/common/motors/feetech/__init__.py b/lerobot/common/motors/feetech/__init__.py index dbe9a08d..911d1d19 100644 --- a/lerobot/common/motors/feetech/__init__.py +++ b/lerobot/common/motors/feetech/__init__.py @@ -1,3 +1,2 @@ from .feetech import DriveMode, FeetechMotorsBus, OperatingMode, TorqueMode -from .feetech_calibration import apply_feetech_offsets_from_calibration, run_full_arm_calibration from .tables import * diff --git a/lerobot/common/motors/feetech/feetech_calibration.py b/lerobot/common/motors/feetech/feetech_calibration.py deleted file mode 100644 index 098370d0..00000000 --- a/lerobot/common/motors/feetech/feetech_calibration.py +++ /dev/null @@ -1,313 +0,0 @@ -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import numpy as np - -from ..motors_bus import ( - CalibrationMode, - MotorsBus, -) -from .feetech import FeetechMotorsBus, TorqueMode -from .tables import MODEL_RESOLUTION - -URL_TEMPLATE = ( - "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp" -) - - -def disable_torque(arm: MotorsBus): - if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any(): - raise ValueError("To run calibration, the torque must be disabled on all motors.") - - -def get_calibration_modes(arm: MotorsBus): - """Returns calibration modes for each motor (DEGREE for rotational, LINEAR for gripper).""" - return [ - CalibrationMode.LINEAR.name if name == "gripper" else CalibrationMode.DEGREE.name - for name in arm.names - ] - - -def reset_offset(motor_id, motor_bus): - # Open the write lock, changes to EEPROM do NOT persist yet - motor_bus.write("Lock", 1) - - # Set offset to 0 - motor_name = motor_bus.motor_names[motor_id - 1] - motor_bus.write("Offset", 0, motor_names=[motor_name]) - - # Close the write lock, changes to EEPROM do persist - motor_bus.write("Lock", 0) - - # Confirm that the offset is zero by reading it back - confirmed_offset = motor_bus.read("Offset")[motor_id - 1] - print(f"Offset for motor {motor_id} reset to: {confirmed_offset}") - return confirmed_offset - - -def calibrate_homing_motor(motor_id, motor_bus): - reset_offset(motor_id, motor_bus) - - home_ticks = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts at 0 - print(f"Encoder offset (present position in homing position): {home_ticks}") - - return home_ticks - - -def calibrate_linear_motor(motor_id, motor_bus): - motor_names = motor_bus.motor_names - motor_name = motor_names[motor_id - 1] - - reset_offset(motor_id, motor_bus) - - input(f"Close the {motor_name}, then press Enter...") - start_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0 - print(f" [Motor {motor_id}] start position recorded: {start_pos}") - - input(f"Open the {motor_name} fully, then press Enter...") - end_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0 - print(f" [Motor {motor_id}] end position recorded: {end_pos}") - - return start_pos, end_pos - - -def single_motor_calibration(arm: MotorsBus, motor_id: int): - """Calibrates a single motor and returns its calibration data for updating the calibration file.""" - - disable_torque(arm) - print(f"\n--- Calibrating Motor {motor_id} ---") - - start_pos = 0 - end_pos = 0 - encoder_offset = 0 - - if motor_id == 6: - start_pos, end_pos = calibrate_linear_motor(motor_id, arm) - else: - input("Move the motor to (zero) position, then press Enter...") - encoder_offset = calibrate_homing_motor(motor_id, arm) - - print(f"Calibration for motor ID:{motor_id} done.") - - # Create a calibration dictionary for the single motor - calib_dict = { - "homing_offset": int(encoder_offset), - "drive_mode": 0, - "start_pos": int(start_pos), - "end_pos": int(end_pos), - "calib_mode": get_calibration_modes(arm)[motor_id - 1], - "motor_name": arm.names[motor_id - 1], - } - - return calib_dict - - -def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str): - """ - Runs a full calibration process for all motors in a robotic arm. - - This function calibrates each motor in the arm, determining encoder offsets and - start/end positions for linear and rotational motors. The calibration data is then - stored in a dictionary for later use. - - **Calibration Process:** - - The user is prompted to move the arm to its homing position before starting. - - Motors with rotational motion are calibrated using a homing method. - - Linear actuators (e.g., grippers) are calibrated separately. - - Encoder offsets, start positions, and end positions are recorded. - - **Example Usage:** - ```python - run_full_arm_calibration(arm, "so100", "left", "follower") - ``` - """ - disable_torque(arm) - - print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...") - - print("\nMove arm to homing position (middle)") - print( - "See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero") - ) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial - input("Press Enter to continue...") - - start_positions = np.zeros(len(arm.ids)) - end_positions = np.zeros(len(arm.ids)) - encoder_offsets = np.zeros(len(arm.ids)) - - modes = get_calibration_modes(arm) - - for i, motor_id in enumerate(arm.ids): - if modes[i] == CalibrationMode.DEGREE.name: - encoder_offsets[i] = calibrate_homing_motor(motor_id, arm) - start_positions[i] = 0 - end_positions[i] = 0 - - for i, motor_id in enumerate(arm.ids): - if modes[i] == CalibrationMode.LINEAR.name: - start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm) - encoder_offsets[i] = 0 - - print("\nMove arm to rest position") - input("Press Enter to continue...") - - print(f"\n calibration of {robot_type} {arm_name} {arm_type} done!") - - # Force drive_mode values (can be static) - drive_modes = [0, 1, 0, 0, 1, 0] - - calib_dict = { - "homing_offset": encoder_offsets.astype(int).tolist(), - "drive_mode": drive_modes, - "start_pos": start_positions.astype(int).tolist(), - "end_pos": end_positions.astype(int).tolist(), - "calib_mode": get_calibration_modes(arm), - "motor_names": arm.names, - } - return calib_dict - - -def run_full_auto_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str): - """TODO(pepijn): Add this method later as extra - Example of usage: - ```python - run_full_auto_arm_calibration(arm, "so100", "left", "follower") - ``` - """ - print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...") - - -def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibration_dict: dict): - """ - Reads 'calibration_dict' containing 'homing_offset' and 'motor_names', - then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM. - - This version is modified so each homed position (originally 0) will now read - 2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently - stored in EEPROM, so the servo's Present_Position is hardware-shifted even - after power cycling. - - Steps: - 1) Subtract 2047 from the old offset (so 0 -> 2047). - 2) Clamp to [-2047..+2047]. - 3) Encode sign bit and magnitude into a 12-bit number. - """ - - homing_offsets = calibration_dict["homing_offset"] - motor_names = calibration_dict["motor_names"] - start_pos = calibration_dict["start_pos"] - - # Open the write lock, changes to EEPROM do NOT persist yet - motorsbus.write("Lock", 1) - - # For each motor, set the 'Offset' parameter - for m_name, old_offset in zip(motor_names, homing_offsets, strict=False): - # If bus doesn’t have a motor named m_name, skip - if m_name not in motorsbus.motors: - print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.") - continue - - if m_name == "gripper": - old_offset = start_pos # If gripper set the offset to the start position of the gripper - continue - - # Shift the offset so the homed position reads 2047 - new_offset = old_offset - 2047 - - # Clamp to [-2047..+2047] - if new_offset > 2047: - new_offset = 2047 - print( - f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!" - ) - elif new_offset < -2047: - new_offset = -2047 - print( - f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!" - ) - - # Determine the direction (sign) bit and magnitude - direction_bit = 1 if new_offset < 0 else 0 - magnitude = abs(new_offset) - - # Combine sign bit (bit 11) with the magnitude (bits 0..10) - servo_offset = (direction_bit << 11) | magnitude - - # Write offset to servo - motorsbus.write("Offset", servo_offset, motor_names=m_name) - print( - f"Set offset for {m_name}: " - f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}" - ) - - motorsbus.write("Lock", 0) - print("Offsets have been saved to EEPROM successfully.") - - -def convert_ticks_to_degrees(ticks, model): - resolutions = MODEL_RESOLUTION[model] - # Convert the ticks to degrees - return ticks * (360.0 / resolutions) - - -def convert_degrees_to_ticks(degrees, model): - resolutions = MODEL_RESOLUTION[model] - # Convert degrees to motor ticks - return int(degrees * (resolutions / 360.0)) - - -def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int: - """ - Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048' - becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution). - """ - resolutions = MODEL_RESOLUTION[model] - - # Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1]. - ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions - - # If above halfway, fold it into negative territory => [-2048..+2047]. - if ticks > (resolutions // 2): - ticks -= resolutions - - # Flip sign if drive_mode is set. - drive_mode = 0 - if motor_bus.calibration is not None: - drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1] - - if drive_mode: - ticks *= -1 - - return ticks - - -def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int: - """ - Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047] - and recovers the raw [0..(res-1)] ticks with 2048 as midpoint. - """ - # Flip sign if drive_mode was set. - drive_mode = 0 - if motor_bus.calibration is not None: - drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1] - - if drive_mode: - adjusted_pos *= -1 - - resolutions = MODEL_RESOLUTION[model] - - # Shift by +half-resolution and wrap into [0..res-1]. - # This undoes the earlier shift by -half-resolution. - ticks = (adjusted_pos + (resolutions // 2)) % resolutions - - return ticks diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index 40cda4f6..71d01e49 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -20,12 +20,10 @@ # https://github.com/astral-sh/ruff/issues/3711 import abc -import json import logging from dataclasses import dataclass from enum import Enum from functools import cached_property -from pathlib import Path from pprint import pformat from typing import Protocol, TypeAlias, overload @@ -414,12 +412,6 @@ class MotorsBus(abc.ABC): logger.error(e) return False - def set_calibration(self, calibration_fpath: Path) -> None: - with open(calibration_fpath) as f: - calibration = json.load(f) - - self.calibration = {int(id_): val for id_, val in calibration.items()} - @abc.abstractmethod def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: pass diff --git a/lerobot/common/robots/koch/robot_koch.py b/lerobot/common/robots/koch/robot_koch.py index 90a63776..fa1ee284 100644 --- a/lerobot/common/robots/koch/robot_koch.py +++ b/lerobot/common/robots/koch/robot_koch.py @@ -149,6 +149,7 @@ class KochRobot(Robot): self.is_connected = True def calibrate(self) -> None: + # TODO(pepijn): Do calibration in same way as so100 """After calibration all motors function in human interpretable ranges. Rotations are expressed in degrees in nominal range of [-180, 180], and linear motions (like gripper of Aloha) in nominal range of [0, 100]. diff --git a/lerobot/common/robots/manipulator.py b/lerobot/common/robots/manipulator.py index 5bdddd03..c5507ecf 100644 --- a/lerobot/common/robots/manipulator.py +++ b/lerobot/common/robots/manipulator.py @@ -18,7 +18,6 @@ and send orders to its motors. # TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated # calibration procedure, to make it easy for people to add their own robot. -import json import time import warnings from dataclasses import dataclass, field @@ -81,73 +80,6 @@ class ManipulatorRobotConfig(RobotConfig): ) -def apply_feetech_offsets_from_calibration(motorsbus, calibration_dict: dict): - """ - Reads 'calibration_dict' containing 'homing_offset' and 'motor_names', - then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM. - - This version is modified so each homed position (originally 0) will now read - 2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently - stored in EEPROM, so the servo's Present_Position is hardware-shifted even - after power cycling. - - Steps: - 1) Subtract 2047 from the old offset (so 0 -> 2047). - 2) Clamp to [-2047..+2047]. - 3) Encode sign bit and magnitude into a 12-bit number. - """ - - homing_offsets = calibration_dict["homing_offset"] - motor_names = calibration_dict["motor_names"] - start_pos = calibration_dict["start_pos"] - - # Open the write lock, changes to EEPROM do NOT persist yet - motorsbus.write("Lock", 1) - - # For each motor, set the 'Offset' parameter - for m_name, old_offset in zip(motor_names, homing_offsets, strict=False): - # If bus doesn’t have a motor named m_name, skip - if m_name not in motorsbus.motors: - print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.") - continue - - if m_name == "gripper": - old_offset = start_pos # If gripper set the offset to the start position of the gripper - continue - - # Shift the offset so the homed position reads 2047 - new_offset = old_offset - 2047 - - # Clamp to [-2047..+2047] - if new_offset > 2047: - new_offset = 2047 - print( - f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!" - ) - elif new_offset < -2047: - new_offset = -2047 - print( - f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!" - ) - - # Determine the direction (sign) bit and magnitude - direction_bit = 1 if new_offset < 0 else 0 - magnitude = abs(new_offset) - - # Combine sign bit (bit 11) with the magnitude (bits 0..10) - servo_offset = (direction_bit << 11) | magnitude - - # Write offset to servo - motorsbus.write("Offset", servo_offset, motor_names=m_name) - print( - f"Set offset for {m_name}: " - f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}" - ) - - motorsbus.write("Lock", 0) - print("Offsets have been saved to EEPROM successfully.") - - class ManipulatorRobot: # TODO(rcadene): Implement force feedback """This class allows to control any manipulator robot of various number of motors. @@ -347,8 +279,6 @@ class ManipulatorRobot: for name in self.leader_arms: self.leader_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value) - self.activate_calibration() - # Set robot preset (e.g. torque in leader gripper for Koch v1.1) if self.robot_type in ["koch", "koch_bimanual"]: self.set_koch_robot_preset() @@ -385,61 +315,6 @@ class ManipulatorRobot: self.is_connected = True - def activate_calibration(self): - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - - def load_or_run_calibration_(name, arm, arm_type): - arm_id = get_arm_id(name, arm_type) - arm_calib_path = self.calibration_dir / f"{arm_id}.json" - - if arm_calib_path.exists(): - with open(arm_calib_path) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - print(f"Missing calibration file '{arm_calib_path}'") - - if self.robot_type in ["koch", "koch_bimanual", "aloha"]: - from lerobot.common.motors.dynamixel.dynamixel_calibration import run_arm_calibration - - calibration = run_arm_calibration(arm, self.robot_type, name, arm_type) - - elif self.robot_type in ["so100", "moss", "lekiwi"]: - from lerobot.common.motors.feetech.feetech_calibration import ( - run_full_arm_calibration, - ) - - calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type) - - print(f"Calibration is done! Saving calibration file '{arm_calib_path}'") - arm_calib_path.parent.mkdir(parents=True, exist_ok=True) - with open(arm_calib_path, "w") as f: - json.dump(calibration, f) - - return calibration - - # For each follower arm - - for name, arm_bus in self.follower_arms.items(): - calibration = load_or_run_calibration_(name, arm_bus, "follower") - arm_bus.set_calibration(calibration) - - # If this is a Feetech robot, also set the servo offset into EEPROM - if self.robot_type in ["so100", "lekiwi"]: - apply_feetech_offsets_from_calibration(arm_bus, calibration) - - # For each leader arm - for name, arm_bus in self.leader_arms.items(): - calibration = load_or_run_calibration_(name, arm_bus, "leader") - arm_bus.set_calibration(calibration) - - # Optionally also set offset for leader if you want the servo offsets as well - if self.robot_type in ["so100", "lekiwi"]: - apply_feetech_offsets_from_calibration(arm_bus, calibration) - def set_koch_robot_preset(self): def set_operating_mode_(arm): from lerobot.common.motors.dynamixel.dynamixel import TorqueMode @@ -540,9 +415,6 @@ class ManipulatorRobot: # Set I_Coefficient and D_Coefficient to default value 0 and 32 self.follower_arms[name].write("I_Coefficient", 0) self.follower_arms[name].write("D_Coefficient", 32) - # Close the write lock so that Maximum_Acceleration gets written to EPROM address, - # which is mandatory for Maximum_Acceleration to take effect after rebooting. - self.follower_arms[name].write("Lock", 0) # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of # the motors. Note: this configuration is not in the official STS3215 Memory Table self.follower_arms[name].write("Maximum_Acceleration", 254) diff --git a/lerobot/common/robots/so100/robot_so100.py b/lerobot/common/robots/so100/robot_so100.py index a0321477..f05d7bef 100644 --- a/lerobot/common/robots/so100/robot_so100.py +++ b/lerobot/common/robots/so100/robot_so100.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging import time @@ -23,11 +24,11 @@ from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import CalibrationMode, Motor +from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration from lerobot.common.motors.feetech import ( FeetechMotorsBus, OperatingMode, TorqueMode, - apply_feetech_offsets_from_calibration, ) from ..robot import Robot @@ -97,15 +98,17 @@ class SO100Robot(Robot): # Set I_Coefficient and D_Coefficient to default value 0 and 32 self.arm.write("I_Coefficient", name, 0) self.arm.write("D_Coefficient", name, 32) - # Close the write lock so that Maximum_Acceleration gets written to EPROM address, - # which is mandatory for Maximum_Acceleration to take effect after rebooting. - self.arm.write("Lock", name, 0) # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of # the motors. Note: this configuration is not in the official STS3215 Memory Table self.arm.write("Maximum_Acceleration", name, 254) self.arm.write("Acceleration", name, 254) - self.calibrate() + if not self.calibration_fpath.exists(): + print("Calibration file not found. Running calibration.") + self.calibrate() + else: + print("Calibration file found. Loading calibration data.") + set_calibration(self.arm, self.calibration_fpath) for name in self.arm.names: self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) @@ -135,15 +138,25 @@ class SO100Robot(Robot): cam.connect() def calibrate(self) -> None: - raise NotImplementedError + print(f"\nRunning calibration of {self.name} robot") - def set_calibration(self) -> None: - if not self.calibration_fpath.exists(): - logging.error("Calibration file not found. Please run calibration first") - raise FileNotFoundError(self.calibration_fpath) + offsets = find_offset(self.arm) + min_max = find_min_max(self.arm) - self.arm.set_calibration(self.calibration_fpath) - apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration) + calibration = {} + for name in self.arm.names: + motor_id = self.arm.motors[name].id + calibration[str(motor_id)] = { + "name": name, + "homing_offset": offsets.get(name, 0), + "drive_mode": 0, + "min": min_max[name]["min"], + "max": min_max[name]["max"], + } + + with open(self.calibration_fpath, "w") as f: + json.dump(calibration, f, indent=4) + print("Calibration saved to", self.calibration_fpath) def get_observation(self) -> dict[str, np.ndarray]: """The returned observations do not have a batch dimension.""" diff --git a/lerobot/common/teleoperators/so100/teleop_so100.py b/lerobot/common/teleoperators/so100/teleop_so100.py index ab0bf3c0..89524165 100644 --- a/lerobot/common/teleoperators/so100/teleop_so100.py +++ b/lerobot/common/teleoperators/so100/teleop_so100.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging import time @@ -21,11 +22,11 @@ import numpy as np from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import CalibrationMode, Motor +from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration from lerobot.common.motors.feetech import ( FeetechMotorsBus, - TorqueMode, - apply_feetech_offsets_from_calibration, ) +from lerobot.common.motors.feetech.feetech import TorqueMode from ..teleoperator import Teleoperator from .configuration_so100 import SO100TeleopConfig @@ -71,6 +72,13 @@ class SO100Teleop(Teleoperator): return {} def configure(self) -> None: + if not self.calibration_fpath.exists(): + print("Calibration file not found. Running calibration.") + self.calibrate() + else: + print("Calibration file found. Loading calibration data.") + set_calibration(self.arm, self.calibration_fpath) + # We assume that at connection time, arm is in a rest position, # and torque can be safely disabled to run calibration. for name in self.arm.names: @@ -89,13 +97,32 @@ class SO100Teleop(Teleoperator): logging.info("Connecting arm.") self.arm.connect() self.configure() - self.calibrate() # Check arm can be read self.arm.sync_read("Present_Position") def calibrate(self) -> None: - raise NotImplementedError + print(f"\nRunning calibration of {self.name} teleop") + for name in self.arm.names: + self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + + offsets = find_offset(self.arm) + min_max = find_min_max(self.arm) + + calibration = {} + for name in self.arm.names: + motor_id = self.arm.motors[name].id + calibration[str(motor_id)] = { + "name": name, + "homing_offset": offsets.get(name, 0), + "drive_mode": 0, + "min": min_max[name]["min"], + "max": min_max[name]["max"], + } + + with open(self.calibration_fpath, "w") as f: + json.dump(calibration, f, indent=4) + print("Calibration saved to", self.calibration_fpath) def set_calibration(self) -> None: """After calibration all motors function in human interpretable ranges. @@ -107,7 +134,6 @@ class SO100Teleop(Teleoperator): raise FileNotFoundError(self.calibration_fpath) self.arm.set_calibration(self.calibration_fpath) - apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration) def get_action(self) -> np.ndarray: """The returned action does not have a batch dimension.""" diff --git a/tests/motors/test_calibration.py b/tests/motors/test_calibration.py new file mode 100644 index 00000000..011174db --- /dev/null +++ b/tests/motors/test_calibration.py @@ -0,0 +1,147 @@ +import sys +from typing import Generator +from unittest.mock import Mock, call, patch + +import pytest +import scservo_sdk as scs + +from lerobot.common.motors import CalibrationMode, Motor +from lerobot.common.motors.calibration import find_min_max, find_offset, set_min_max, set_offset +from lerobot.common.motors.feetech import FeetechMotorsBus +from tests.mocks.mock_feetech import MockMotors, MockPortHandler + + +@pytest.fixture(autouse=True) +def patch_port_handler(): + if sys.platform == "darwin": + with patch.object(scs, "PortHandler", MockPortHandler): + yield + else: + yield + + +@pytest.fixture +def mock_motors() -> Generator[MockMotors, None, None]: + motors = MockMotors() + motors.open() + yield motors + motors.close() + + +@pytest.fixture +def dummy_motors() -> dict[str, Motor]: + return { + "dummy_1": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100), + "wrist_roll": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100), + "dummy_3": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100), + } + + +@pytest.fixture(autouse=True) +def patch_broadcast_ping(): + with patch.object(FeetechMotorsBus, "broadcast_ping", return_value={1: 777, 2: 777, 3: 777}): + yield + + +@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}") +def test_autouse_patch(): + """Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler.""" + assert scs.PortHandler is MockPortHandler + + +@pytest.mark.parametrize( + "motor_names, read_values", + [ + ( + ["dummy_1"], + [{"dummy_1": 3000}], + ), + ], + ids=["two-motors"], +) +def test_find_offset(mock_motors, dummy_motors, motor_names, read_values): + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + with ( + patch("builtins.input", return_value=""), + patch("lerobot.common.motors.calibration.set_offset") as mock_set_offset, + ): + motors_bus.sync_read = Mock(side_effect=[{"dummy_1": 3000}]) + motors_bus.motor_names = motor_names + motors_bus.write = Mock(return_value=None) + + find_offset(motors_bus) + # Compute the expected offset: 3000 - 2047 = 953. + expected_calls = [call(motors_bus, 953, "dummy_1")] + mock_set_offset.assert_has_calls(expected_calls, any_order=False) + + +def test_find_min_max(mock_motors, dummy_motors): + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + motors_bus.motor_names = list(dummy_motors.keys()) + read_side_effect = [ + {"dummy_1": 10, "wrist_roll": 20, "dummy_3": 30}, # For first sync_read call. + {"dummy_1": 4000, "wrist_roll": 2000, "dummy_3": 100}, # For second sync_read call. + {"dummy_1": 100, "wrist_roll": 4050, "dummy_3": 2010}, # For third sync_read call. + ] + motors_bus.sync_read = Mock(side_effect=read_side_effect) + + select_returns = [ + ([], [], []), # First iteration: no input. + ([], [], []), # Second iteration. + ([sys.stdin], [], []), # Third iteration: simulate pressing ENTER. + ] + with ( + patch("lerobot.common.motors.calibration.set_min_max") as mock_set_min_max, + patch("lerobot.common.motors.calibration.select.select", side_effect=select_returns), + patch("sys.stdin.readline", return_value="\n"), + ): + find_min_max(motors_bus) + + mock_set_min_max.assert_any_call(motors_bus, 10, 4000, "dummy_1") + mock_set_min_max.assert_any_call(motors_bus, 0, 4095, "wrist_roll") # wrist_roll is forced to [0,4095] + mock_set_min_max.assert_any_call(motors_bus, 30, 2010, "dummy_3") + assert mock_set_min_max.call_count == 3 + + +def test_set_offset_clamping(mock_motors, dummy_motors): + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + motors_bus.sync_read = Mock(return_value={"dummy_1": 2047}) + motors_bus.write = Mock() + # A very large offset should be clamped to +2047. + set_offset(motors_bus, 9999, "dummy_1") + motors_bus.write.assert_any_call("Offset", "dummy_1", 2047, raw_value=True) + + +def test_set_min_max(mock_motors, dummy_motors): + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + def _sync_read_side_effect(data_name, motors, *, raw_values=False): + if data_name == "Min_Angle_Limit": + return {"dummy_1": 100} + elif data_name == "Max_Angle_Limit": + return {"dummy_1": 3000} + return {} + + motors_bus.sync_read = Mock(side_effect=_sync_read_side_effect) + + motors_bus.write = Mock() + set_min_max(motors_bus, 100, 3000, "dummy_1") + motors_bus.write.assert_any_call("Min_Angle_Limit", "dummy_1", 100, raw_value=True) + motors_bus.write.assert_any_call("Max_Angle_Limit", "dummy_1", 3000, raw_value=True)