Add new calibration method for robot refactor (#896)
Co-authored-by: Simon Alibert <simon.alibert@huggingface.co>
This commit is contained in:
@@ -1,4 +1,29 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import select
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .dynamixel.dynamixel import DynamixelMotorsBus
|
||||
from .feetech.feetech import FeetechMotorsBus
|
||||
from .motors_bus import MotorsBus
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -8,3 +33,146 @@ class MotorCalibration:
|
||||
homing_offset: int
|
||||
range_min: int
|
||||
range_max: int
|
||||
|
||||
|
||||
def find_offset(motorbus: MotorsBus):
|
||||
input("Move robot to the middle of its range of motion and press ENTER....")
|
||||
for name in motorbus.names:
|
||||
# Also reset to defaults
|
||||
if isinstance(motorbus, FeetechMotorsBus):
|
||||
motorbus.write("Offset", name, 0, raw_value=True)
|
||||
motorbus.write("Min_Angle_Limit", name, 0, raw_value=True)
|
||||
motorbus.write("Max_Angle_Limit", name, 4095, raw_value=True)
|
||||
elif isinstance(motorbus, DynamixelMotorsBus):
|
||||
motorbus.write("Homing_Offset", name, 0, raw_value=True)
|
||||
motorbus.write("Min_Position_Limit", name, 0, raw_value=True)
|
||||
motorbus.write("Max_Position_Limit", name, 4095, raw_value=True)
|
||||
else:
|
||||
raise ValueError("Motorbus instance is unknown")
|
||||
|
||||
middle_values = motorbus.sync_read("Present_Position", raw_values=True)
|
||||
|
||||
offsets = {}
|
||||
for name, pos in middle_values.items():
|
||||
offset = pos - 2047 # Center the middle reading at 2047.
|
||||
MotorCalibration.set_offset(motorbus, offset, name)
|
||||
offsets[name] = offset
|
||||
|
||||
return offsets
|
||||
|
||||
|
||||
def find_min_max(motorbus: MotorsBus):
|
||||
print("Move all joints (except wrist_roll; id = 5) sequentially through their entire ranges of motion.")
|
||||
print("Recording positions. Press ENTER to stop...")
|
||||
|
||||
recorded_data = {name: [] for name in motorbus.names}
|
||||
|
||||
while True:
|
||||
positions = motorbus.sync_read("Present_Position", raw_values=True)
|
||||
for name in motorbus.names:
|
||||
recorded_data[name].append(positions[name])
|
||||
time.sleep(0.01)
|
||||
|
||||
# Check if user pressed Enter
|
||||
ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
|
||||
if ready_to_read:
|
||||
line = sys.stdin.readline()
|
||||
if line.strip() == "":
|
||||
break
|
||||
|
||||
min_max = {}
|
||||
for name in motorbus.names:
|
||||
motor_values = recorded_data[name]
|
||||
raw_min = min(motor_values)
|
||||
raw_max = max(motor_values)
|
||||
|
||||
if name == "wrist_roll":
|
||||
physical_min = 0
|
||||
physical_max = 4095
|
||||
else:
|
||||
physical_min = int(raw_min)
|
||||
physical_max = int(raw_max)
|
||||
|
||||
MotorCalibration.set_min_max(motorbus, physical_min, physical_max, name)
|
||||
min_max[name] = {"min": physical_min, "max": physical_max}
|
||||
|
||||
return min_max
|
||||
|
||||
|
||||
def set_calibration(motorbus: MotorsBus, calibration_fpath: Path) -> None:
|
||||
with open(calibration_fpath) as f:
|
||||
calibration = json.load(f)
|
||||
|
||||
motorbus.calibration = {int(id_): val for id_, val in calibration.items()}
|
||||
|
||||
for _, cal_data in motorbus.calibration.items():
|
||||
name = cal_data.get("name")
|
||||
if name not in motorbus.names:
|
||||
print(f"Motor name '{name}' from calibration not found in arm names.")
|
||||
continue
|
||||
|
||||
MotorCalibration.set_offset(motorbus, cal_data["homing_offset"], name)
|
||||
MotorCalibration.set_min_max(motorbus, cal_data["min"], cal_data["max"], name)
|
||||
|
||||
|
||||
def set_offset(motorbus: MotorsBus, homing_offset: int, name: str):
|
||||
homing_offset = int(homing_offset)
|
||||
|
||||
# Clamp to [-2047..+2047]
|
||||
if homing_offset > 2047:
|
||||
homing_offset = 2047
|
||||
print(
|
||||
f"Warning: '{homing_offset}' is getting clamped because its larger then 2047; This should not happen!"
|
||||
)
|
||||
elif homing_offset < -2047:
|
||||
homing_offset = -2047
|
||||
print(
|
||||
f"Warning: '{homing_offset}' is getting clamped because its smaller then -2047; This should not happen!"
|
||||
)
|
||||
|
||||
direction_bit = 1 if homing_offset < 0 else 0 # Determine the direction (sign) bit and magnitude
|
||||
magnitude = abs(homing_offset)
|
||||
servo_offset = (
|
||||
direction_bit << 11
|
||||
) | magnitude # Combine sign bit (bit 11) with the magnitude (bits 0..10)
|
||||
|
||||
if isinstance(motorbus, FeetechMotorsBus):
|
||||
motorbus.write("Offset", name, servo_offset, raw_value=True)
|
||||
read_offset = motorbus.sync_read("Offset", name, raw_values=True)
|
||||
elif isinstance(motorbus, DynamixelMotorsBus):
|
||||
motorbus.write("Homing_Offset", name, servo_offset, raw_value=True)
|
||||
read_offset = motorbus.sync_read("Homing_Offset", name, raw_values=True)
|
||||
else:
|
||||
raise ValueError("Motorbus instance is unknown")
|
||||
|
||||
if read_offset[name] != servo_offset:
|
||||
raise ValueError(
|
||||
f"Offset not set correctly for motor '{name}'. read: {read_offset} does not equal {servo_offset}"
|
||||
)
|
||||
|
||||
|
||||
def set_min_max(motorbus: MotorsBus, min: int, max: int, name: str):
|
||||
if isinstance(motorbus, FeetechMotorsBus):
|
||||
motorbus.write("Min_Angle_Limit", name, min, raw_value=True)
|
||||
motorbus.write("Max_Angle_Limit", name, max, raw_value=True)
|
||||
|
||||
read_min = motorbus.sync_read("Min_Angle_Limit", name, raw_values=True)
|
||||
read_max = motorbus.sync_read("Max_Angle_Limit", name, raw_values=True)
|
||||
elif isinstance(motorbus, DynamixelMotorsBus):
|
||||
motorbus.write("Min_Position_Limit", name, min, raw_value=True)
|
||||
motorbus.write("Max_Position_Limit", name, max, raw_value=True)
|
||||
|
||||
read_min = motorbus.sync_read("Min_Position_Limit", name, raw_values=True)
|
||||
read_max = motorbus.sync_read("Max_Position_Limit", name, raw_values=True)
|
||||
else:
|
||||
raise ValueError("Motorbus instance is unknown")
|
||||
|
||||
if read_min[name] != min:
|
||||
raise ValueError(
|
||||
f"Min is not set correctly for motor '{name}'. read: {read_min} does not equal {min}"
|
||||
)
|
||||
|
||||
if read_max[name] != max:
|
||||
raise ValueError(
|
||||
f"Max is not set correctly for motor '{name}'. read: {read_max} does not equal {max}"
|
||||
)
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
from .feetech import DriveMode, FeetechMotorsBus, OperatingMode, TorqueMode
|
||||
from .feetech_calibration import apply_feetech_offsets_from_calibration, run_full_arm_calibration
|
||||
from .tables import *
|
||||
|
||||
@@ -1,313 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import numpy as np
|
||||
|
||||
from ..motors_bus import (
|
||||
CalibrationMode,
|
||||
MotorsBus,
|
||||
)
|
||||
from .feetech import FeetechMotorsBus, TorqueMode
|
||||
from .tables import MODEL_RESOLUTION
|
||||
|
||||
URL_TEMPLATE = (
|
||||
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
|
||||
)
|
||||
|
||||
|
||||
def disable_torque(arm: MotorsBus):
|
||||
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
|
||||
raise ValueError("To run calibration, the torque must be disabled on all motors.")
|
||||
|
||||
|
||||
def get_calibration_modes(arm: MotorsBus):
|
||||
"""Returns calibration modes for each motor (DEGREE for rotational, LINEAR for gripper)."""
|
||||
return [
|
||||
CalibrationMode.LINEAR.name if name == "gripper" else CalibrationMode.DEGREE.name
|
||||
for name in arm.names
|
||||
]
|
||||
|
||||
|
||||
def reset_offset(motor_id, motor_bus):
|
||||
# Open the write lock, changes to EEPROM do NOT persist yet
|
||||
motor_bus.write("Lock", 1)
|
||||
|
||||
# Set offset to 0
|
||||
motor_name = motor_bus.motor_names[motor_id - 1]
|
||||
motor_bus.write("Offset", 0, motor_names=[motor_name])
|
||||
|
||||
# Close the write lock, changes to EEPROM do persist
|
||||
motor_bus.write("Lock", 0)
|
||||
|
||||
# Confirm that the offset is zero by reading it back
|
||||
confirmed_offset = motor_bus.read("Offset")[motor_id - 1]
|
||||
print(f"Offset for motor {motor_id} reset to: {confirmed_offset}")
|
||||
return confirmed_offset
|
||||
|
||||
|
||||
def calibrate_homing_motor(motor_id, motor_bus):
|
||||
reset_offset(motor_id, motor_bus)
|
||||
|
||||
home_ticks = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts at 0
|
||||
print(f"Encoder offset (present position in homing position): {home_ticks}")
|
||||
|
||||
return home_ticks
|
||||
|
||||
|
||||
def calibrate_linear_motor(motor_id, motor_bus):
|
||||
motor_names = motor_bus.motor_names
|
||||
motor_name = motor_names[motor_id - 1]
|
||||
|
||||
reset_offset(motor_id, motor_bus)
|
||||
|
||||
input(f"Close the {motor_name}, then press Enter...")
|
||||
start_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
|
||||
print(f" [Motor {motor_id}] start position recorded: {start_pos}")
|
||||
|
||||
input(f"Open the {motor_name} fully, then press Enter...")
|
||||
end_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
|
||||
print(f" [Motor {motor_id}] end position recorded: {end_pos}")
|
||||
|
||||
return start_pos, end_pos
|
||||
|
||||
|
||||
def single_motor_calibration(arm: MotorsBus, motor_id: int):
|
||||
"""Calibrates a single motor and returns its calibration data for updating the calibration file."""
|
||||
|
||||
disable_torque(arm)
|
||||
print(f"\n--- Calibrating Motor {motor_id} ---")
|
||||
|
||||
start_pos = 0
|
||||
end_pos = 0
|
||||
encoder_offset = 0
|
||||
|
||||
if motor_id == 6:
|
||||
start_pos, end_pos = calibrate_linear_motor(motor_id, arm)
|
||||
else:
|
||||
input("Move the motor to (zero) position, then press Enter...")
|
||||
encoder_offset = calibrate_homing_motor(motor_id, arm)
|
||||
|
||||
print(f"Calibration for motor ID:{motor_id} done.")
|
||||
|
||||
# Create a calibration dictionary for the single motor
|
||||
calib_dict = {
|
||||
"homing_offset": int(encoder_offset),
|
||||
"drive_mode": 0,
|
||||
"start_pos": int(start_pos),
|
||||
"end_pos": int(end_pos),
|
||||
"calib_mode": get_calibration_modes(arm)[motor_id - 1],
|
||||
"motor_name": arm.names[motor_id - 1],
|
||||
}
|
||||
|
||||
return calib_dict
|
||||
|
||||
|
||||
def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
||||
"""
|
||||
Runs a full calibration process for all motors in a robotic arm.
|
||||
|
||||
This function calibrates each motor in the arm, determining encoder offsets and
|
||||
start/end positions for linear and rotational motors. The calibration data is then
|
||||
stored in a dictionary for later use.
|
||||
|
||||
**Calibration Process:**
|
||||
- The user is prompted to move the arm to its homing position before starting.
|
||||
- Motors with rotational motion are calibrated using a homing method.
|
||||
- Linear actuators (e.g., grippers) are calibrated separately.
|
||||
- Encoder offsets, start positions, and end positions are recorded.
|
||||
|
||||
**Example Usage:**
|
||||
```python
|
||||
run_full_arm_calibration(arm, "so100", "left", "follower")
|
||||
```
|
||||
"""
|
||||
disable_torque(arm)
|
||||
|
||||
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
|
||||
|
||||
print("\nMove arm to homing position (middle)")
|
||||
print(
|
||||
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero")
|
||||
) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial
|
||||
input("Press Enter to continue...")
|
||||
|
||||
start_positions = np.zeros(len(arm.ids))
|
||||
end_positions = np.zeros(len(arm.ids))
|
||||
encoder_offsets = np.zeros(len(arm.ids))
|
||||
|
||||
modes = get_calibration_modes(arm)
|
||||
|
||||
for i, motor_id in enumerate(arm.ids):
|
||||
if modes[i] == CalibrationMode.DEGREE.name:
|
||||
encoder_offsets[i] = calibrate_homing_motor(motor_id, arm)
|
||||
start_positions[i] = 0
|
||||
end_positions[i] = 0
|
||||
|
||||
for i, motor_id in enumerate(arm.ids):
|
||||
if modes[i] == CalibrationMode.LINEAR.name:
|
||||
start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm)
|
||||
encoder_offsets[i] = 0
|
||||
|
||||
print("\nMove arm to rest position")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
print(f"\n calibration of {robot_type} {arm_name} {arm_type} done!")
|
||||
|
||||
# Force drive_mode values (can be static)
|
||||
drive_modes = [0, 1, 0, 0, 1, 0]
|
||||
|
||||
calib_dict = {
|
||||
"homing_offset": encoder_offsets.astype(int).tolist(),
|
||||
"drive_mode": drive_modes,
|
||||
"start_pos": start_positions.astype(int).tolist(),
|
||||
"end_pos": end_positions.astype(int).tolist(),
|
||||
"calib_mode": get_calibration_modes(arm),
|
||||
"motor_names": arm.names,
|
||||
}
|
||||
return calib_dict
|
||||
|
||||
|
||||
def run_full_auto_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
||||
"""TODO(pepijn): Add this method later as extra
|
||||
Example of usage:
|
||||
```python
|
||||
run_full_auto_arm_calibration(arm, "so100", "left", "follower")
|
||||
```
|
||||
"""
|
||||
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
|
||||
|
||||
|
||||
def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibration_dict: dict):
|
||||
"""
|
||||
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
|
||||
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
|
||||
|
||||
This version is modified so each homed position (originally 0) will now read
|
||||
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
|
||||
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
|
||||
after power cycling.
|
||||
|
||||
Steps:
|
||||
1) Subtract 2047 from the old offset (so 0 -> 2047).
|
||||
2) Clamp to [-2047..+2047].
|
||||
3) Encode sign bit and magnitude into a 12-bit number.
|
||||
"""
|
||||
|
||||
homing_offsets = calibration_dict["homing_offset"]
|
||||
motor_names = calibration_dict["motor_names"]
|
||||
start_pos = calibration_dict["start_pos"]
|
||||
|
||||
# Open the write lock, changes to EEPROM do NOT persist yet
|
||||
motorsbus.write("Lock", 1)
|
||||
|
||||
# For each motor, set the 'Offset' parameter
|
||||
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
|
||||
# If bus doesn’t have a motor named m_name, skip
|
||||
if m_name not in motorsbus.motors:
|
||||
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
|
||||
continue
|
||||
|
||||
if m_name == "gripper":
|
||||
old_offset = start_pos # If gripper set the offset to the start position of the gripper
|
||||
continue
|
||||
|
||||
# Shift the offset so the homed position reads 2047
|
||||
new_offset = old_offset - 2047
|
||||
|
||||
# Clamp to [-2047..+2047]
|
||||
if new_offset > 2047:
|
||||
new_offset = 2047
|
||||
print(
|
||||
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
|
||||
)
|
||||
elif new_offset < -2047:
|
||||
new_offset = -2047
|
||||
print(
|
||||
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
|
||||
)
|
||||
|
||||
# Determine the direction (sign) bit and magnitude
|
||||
direction_bit = 1 if new_offset < 0 else 0
|
||||
magnitude = abs(new_offset)
|
||||
|
||||
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
|
||||
servo_offset = (direction_bit << 11) | magnitude
|
||||
|
||||
# Write offset to servo
|
||||
motorsbus.write("Offset", servo_offset, motor_names=m_name)
|
||||
print(
|
||||
f"Set offset for {m_name}: "
|
||||
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
|
||||
)
|
||||
|
||||
motorsbus.write("Lock", 0)
|
||||
print("Offsets have been saved to EEPROM successfully.")
|
||||
|
||||
|
||||
def convert_ticks_to_degrees(ticks, model):
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
# Convert the ticks to degrees
|
||||
return ticks * (360.0 / resolutions)
|
||||
|
||||
|
||||
def convert_degrees_to_ticks(degrees, model):
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
# Convert degrees to motor ticks
|
||||
return int(degrees * (resolutions / 360.0))
|
||||
|
||||
|
||||
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
|
||||
"""
|
||||
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
|
||||
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
|
||||
"""
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
|
||||
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
|
||||
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
|
||||
|
||||
# If above halfway, fold it into negative territory => [-2048..+2047].
|
||||
if ticks > (resolutions // 2):
|
||||
ticks -= resolutions
|
||||
|
||||
# Flip sign if drive_mode is set.
|
||||
drive_mode = 0
|
||||
if motor_bus.calibration is not None:
|
||||
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
|
||||
|
||||
if drive_mode:
|
||||
ticks *= -1
|
||||
|
||||
return ticks
|
||||
|
||||
|
||||
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
|
||||
"""
|
||||
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
|
||||
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
|
||||
"""
|
||||
# Flip sign if drive_mode was set.
|
||||
drive_mode = 0
|
||||
if motor_bus.calibration is not None:
|
||||
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
|
||||
|
||||
if drive_mode:
|
||||
adjusted_pos *= -1
|
||||
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
|
||||
# Shift by +half-resolution and wrap into [0..res-1].
|
||||
# This undoes the earlier shift by -half-resolution.
|
||||
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
|
||||
|
||||
return ticks
|
||||
@@ -20,12 +20,10 @@
|
||||
# https://github.com/astral-sh/ruff/issues/3711
|
||||
|
||||
import abc
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from typing import Protocol, TypeAlias, overload
|
||||
|
||||
@@ -414,12 +412,6 @@ class MotorsBus(abc.ABC):
|
||||
logger.error(e)
|
||||
return False
|
||||
|
||||
def set_calibration(self, calibration_fpath: Path) -> None:
|
||||
with open(calibration_fpath) as f:
|
||||
calibration = json.load(f)
|
||||
|
||||
self.calibration = {int(id_): val for id_, val in calibration.items()}
|
||||
|
||||
@abc.abstractmethod
|
||||
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||
pass
|
||||
|
||||
@@ -149,6 +149,7 @@ class KochRobot(Robot):
|
||||
self.is_connected = True
|
||||
|
||||
def calibrate(self) -> None:
|
||||
# TODO(pepijn): Do calibration in same way as so100
|
||||
"""After calibration all motors function in human interpretable ranges.
|
||||
Rotations are expressed in degrees in nominal range of [-180, 180],
|
||||
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
|
||||
|
||||
@@ -18,7 +18,6 @@ and send orders to its motors.
|
||||
# TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated
|
||||
# calibration procedure, to make it easy for people to add their own robot.
|
||||
|
||||
import json
|
||||
import time
|
||||
import warnings
|
||||
from dataclasses import dataclass, field
|
||||
@@ -81,73 +80,6 @@ class ManipulatorRobotConfig(RobotConfig):
|
||||
)
|
||||
|
||||
|
||||
def apply_feetech_offsets_from_calibration(motorsbus, calibration_dict: dict):
|
||||
"""
|
||||
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
|
||||
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
|
||||
|
||||
This version is modified so each homed position (originally 0) will now read
|
||||
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
|
||||
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
|
||||
after power cycling.
|
||||
|
||||
Steps:
|
||||
1) Subtract 2047 from the old offset (so 0 -> 2047).
|
||||
2) Clamp to [-2047..+2047].
|
||||
3) Encode sign bit and magnitude into a 12-bit number.
|
||||
"""
|
||||
|
||||
homing_offsets = calibration_dict["homing_offset"]
|
||||
motor_names = calibration_dict["motor_names"]
|
||||
start_pos = calibration_dict["start_pos"]
|
||||
|
||||
# Open the write lock, changes to EEPROM do NOT persist yet
|
||||
motorsbus.write("Lock", 1)
|
||||
|
||||
# For each motor, set the 'Offset' parameter
|
||||
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
|
||||
# If bus doesn’t have a motor named m_name, skip
|
||||
if m_name not in motorsbus.motors:
|
||||
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
|
||||
continue
|
||||
|
||||
if m_name == "gripper":
|
||||
old_offset = start_pos # If gripper set the offset to the start position of the gripper
|
||||
continue
|
||||
|
||||
# Shift the offset so the homed position reads 2047
|
||||
new_offset = old_offset - 2047
|
||||
|
||||
# Clamp to [-2047..+2047]
|
||||
if new_offset > 2047:
|
||||
new_offset = 2047
|
||||
print(
|
||||
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
|
||||
)
|
||||
elif new_offset < -2047:
|
||||
new_offset = -2047
|
||||
print(
|
||||
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
|
||||
)
|
||||
|
||||
# Determine the direction (sign) bit and magnitude
|
||||
direction_bit = 1 if new_offset < 0 else 0
|
||||
magnitude = abs(new_offset)
|
||||
|
||||
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
|
||||
servo_offset = (direction_bit << 11) | magnitude
|
||||
|
||||
# Write offset to servo
|
||||
motorsbus.write("Offset", servo_offset, motor_names=m_name)
|
||||
print(
|
||||
f"Set offset for {m_name}: "
|
||||
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
|
||||
)
|
||||
|
||||
motorsbus.write("Lock", 0)
|
||||
print("Offsets have been saved to EEPROM successfully.")
|
||||
|
||||
|
||||
class ManipulatorRobot:
|
||||
# TODO(rcadene): Implement force feedback
|
||||
"""This class allows to control any manipulator robot of various number of motors.
|
||||
@@ -347,8 +279,6 @@ class ManipulatorRobot:
|
||||
for name in self.leader_arms:
|
||||
self.leader_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value)
|
||||
|
||||
self.activate_calibration()
|
||||
|
||||
# Set robot preset (e.g. torque in leader gripper for Koch v1.1)
|
||||
if self.robot_type in ["koch", "koch_bimanual"]:
|
||||
self.set_koch_robot_preset()
|
||||
@@ -385,61 +315,6 @@ class ManipulatorRobot:
|
||||
|
||||
self.is_connected = True
|
||||
|
||||
def activate_calibration(self):
|
||||
"""After calibration all motors function in human interpretable ranges.
|
||||
Rotations are expressed in degrees in nominal range of [-180, 180],
|
||||
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
|
||||
"""
|
||||
|
||||
def load_or_run_calibration_(name, arm, arm_type):
|
||||
arm_id = get_arm_id(name, arm_type)
|
||||
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
|
||||
|
||||
if arm_calib_path.exists():
|
||||
with open(arm_calib_path) as f:
|
||||
calibration = json.load(f)
|
||||
else:
|
||||
# TODO(rcadene): display a warning in __init__ if calibration file not available
|
||||
print(f"Missing calibration file '{arm_calib_path}'")
|
||||
|
||||
if self.robot_type in ["koch", "koch_bimanual", "aloha"]:
|
||||
from lerobot.common.motors.dynamixel.dynamixel_calibration import run_arm_calibration
|
||||
|
||||
calibration = run_arm_calibration(arm, self.robot_type, name, arm_type)
|
||||
|
||||
elif self.robot_type in ["so100", "moss", "lekiwi"]:
|
||||
from lerobot.common.motors.feetech.feetech_calibration import (
|
||||
run_full_arm_calibration,
|
||||
)
|
||||
|
||||
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type)
|
||||
|
||||
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
|
||||
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(arm_calib_path, "w") as f:
|
||||
json.dump(calibration, f)
|
||||
|
||||
return calibration
|
||||
|
||||
# For each follower arm
|
||||
|
||||
for name, arm_bus in self.follower_arms.items():
|
||||
calibration = load_or_run_calibration_(name, arm_bus, "follower")
|
||||
arm_bus.set_calibration(calibration)
|
||||
|
||||
# If this is a Feetech robot, also set the servo offset into EEPROM
|
||||
if self.robot_type in ["so100", "lekiwi"]:
|
||||
apply_feetech_offsets_from_calibration(arm_bus, calibration)
|
||||
|
||||
# For each leader arm
|
||||
for name, arm_bus in self.leader_arms.items():
|
||||
calibration = load_or_run_calibration_(name, arm_bus, "leader")
|
||||
arm_bus.set_calibration(calibration)
|
||||
|
||||
# Optionally also set offset for leader if you want the servo offsets as well
|
||||
if self.robot_type in ["so100", "lekiwi"]:
|
||||
apply_feetech_offsets_from_calibration(arm_bus, calibration)
|
||||
|
||||
def set_koch_robot_preset(self):
|
||||
def set_operating_mode_(arm):
|
||||
from lerobot.common.motors.dynamixel.dynamixel import TorqueMode
|
||||
@@ -540,9 +415,6 @@ class ManipulatorRobot:
|
||||
# Set I_Coefficient and D_Coefficient to default value 0 and 32
|
||||
self.follower_arms[name].write("I_Coefficient", 0)
|
||||
self.follower_arms[name].write("D_Coefficient", 32)
|
||||
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
|
||||
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
|
||||
self.follower_arms[name].write("Lock", 0)
|
||||
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
|
||||
# the motors. Note: this configuration is not in the official STS3215 Memory Table
|
||||
self.follower_arms[name].write("Maximum_Acceleration", 254)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
@@ -23,11 +24,11 @@ from lerobot.common.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import CalibrationMode, Motor
|
||||
from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration
|
||||
from lerobot.common.motors.feetech import (
|
||||
FeetechMotorsBus,
|
||||
OperatingMode,
|
||||
TorqueMode,
|
||||
apply_feetech_offsets_from_calibration,
|
||||
)
|
||||
|
||||
from ..robot import Robot
|
||||
@@ -97,15 +98,17 @@ class SO100Robot(Robot):
|
||||
# Set I_Coefficient and D_Coefficient to default value 0 and 32
|
||||
self.arm.write("I_Coefficient", name, 0)
|
||||
self.arm.write("D_Coefficient", name, 32)
|
||||
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
|
||||
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
|
||||
self.arm.write("Lock", name, 0)
|
||||
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
|
||||
# the motors. Note: this configuration is not in the official STS3215 Memory Table
|
||||
self.arm.write("Maximum_Acceleration", name, 254)
|
||||
self.arm.write("Acceleration", name, 254)
|
||||
|
||||
self.calibrate()
|
||||
if not self.calibration_fpath.exists():
|
||||
print("Calibration file not found. Running calibration.")
|
||||
self.calibrate()
|
||||
else:
|
||||
print("Calibration file found. Loading calibration data.")
|
||||
set_calibration(self.arm, self.calibration_fpath)
|
||||
|
||||
for name in self.arm.names:
|
||||
self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value)
|
||||
@@ -135,15 +138,25 @@ class SO100Robot(Robot):
|
||||
cam.connect()
|
||||
|
||||
def calibrate(self) -> None:
|
||||
raise NotImplementedError
|
||||
print(f"\nRunning calibration of {self.name} robot")
|
||||
|
||||
def set_calibration(self) -> None:
|
||||
if not self.calibration_fpath.exists():
|
||||
logging.error("Calibration file not found. Please run calibration first")
|
||||
raise FileNotFoundError(self.calibration_fpath)
|
||||
offsets = find_offset(self.arm)
|
||||
min_max = find_min_max(self.arm)
|
||||
|
||||
self.arm.set_calibration(self.calibration_fpath)
|
||||
apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration)
|
||||
calibration = {}
|
||||
for name in self.arm.names:
|
||||
motor_id = self.arm.motors[name].id
|
||||
calibration[str(motor_id)] = {
|
||||
"name": name,
|
||||
"homing_offset": offsets.get(name, 0),
|
||||
"drive_mode": 0,
|
||||
"min": min_max[name]["min"],
|
||||
"max": min_max[name]["max"],
|
||||
}
|
||||
|
||||
with open(self.calibration_fpath, "w") as f:
|
||||
json.dump(calibration, f, indent=4)
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def get_observation(self) -> dict[str, np.ndarray]:
|
||||
"""The returned observations do not have a batch dimension."""
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
@@ -21,11 +22,11 @@ import numpy as np
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import CalibrationMode, Motor
|
||||
from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration
|
||||
from lerobot.common.motors.feetech import (
|
||||
FeetechMotorsBus,
|
||||
TorqueMode,
|
||||
apply_feetech_offsets_from_calibration,
|
||||
)
|
||||
from lerobot.common.motors.feetech.feetech import TorqueMode
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .configuration_so100 import SO100TeleopConfig
|
||||
@@ -71,6 +72,13 @@ class SO100Teleop(Teleoperator):
|
||||
return {}
|
||||
|
||||
def configure(self) -> None:
|
||||
if not self.calibration_fpath.exists():
|
||||
print("Calibration file not found. Running calibration.")
|
||||
self.calibrate()
|
||||
else:
|
||||
print("Calibration file found. Loading calibration data.")
|
||||
set_calibration(self.arm, self.calibration_fpath)
|
||||
|
||||
# We assume that at connection time, arm is in a rest position,
|
||||
# and torque can be safely disabled to run calibration.
|
||||
for name in self.arm.names:
|
||||
@@ -89,13 +97,32 @@ class SO100Teleop(Teleoperator):
|
||||
logging.info("Connecting arm.")
|
||||
self.arm.connect()
|
||||
self.configure()
|
||||
self.calibrate()
|
||||
|
||||
# Check arm can be read
|
||||
self.arm.sync_read("Present_Position")
|
||||
|
||||
def calibrate(self) -> None:
|
||||
raise NotImplementedError
|
||||
print(f"\nRunning calibration of {self.name} teleop")
|
||||
for name in self.arm.names:
|
||||
self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
|
||||
|
||||
offsets = find_offset(self.arm)
|
||||
min_max = find_min_max(self.arm)
|
||||
|
||||
calibration = {}
|
||||
for name in self.arm.names:
|
||||
motor_id = self.arm.motors[name].id
|
||||
calibration[str(motor_id)] = {
|
||||
"name": name,
|
||||
"homing_offset": offsets.get(name, 0),
|
||||
"drive_mode": 0,
|
||||
"min": min_max[name]["min"],
|
||||
"max": min_max[name]["max"],
|
||||
}
|
||||
|
||||
with open(self.calibration_fpath, "w") as f:
|
||||
json.dump(calibration, f, indent=4)
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def set_calibration(self) -> None:
|
||||
"""After calibration all motors function in human interpretable ranges.
|
||||
@@ -107,7 +134,6 @@ class SO100Teleop(Teleoperator):
|
||||
raise FileNotFoundError(self.calibration_fpath)
|
||||
|
||||
self.arm.set_calibration(self.calibration_fpath)
|
||||
apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration)
|
||||
|
||||
def get_action(self) -> np.ndarray:
|
||||
"""The returned action does not have a batch dimension."""
|
||||
|
||||
147
tests/motors/test_calibration.py
Normal file
147
tests/motors/test_calibration.py
Normal file
@@ -0,0 +1,147 @@
|
||||
import sys
|
||||
from typing import Generator
|
||||
from unittest.mock import Mock, call, patch
|
||||
|
||||
import pytest
|
||||
import scservo_sdk as scs
|
||||
|
||||
from lerobot.common.motors import CalibrationMode, Motor
|
||||
from lerobot.common.motors.calibration import find_min_max, find_offset, set_min_max, set_offset
|
||||
from lerobot.common.motors.feetech import FeetechMotorsBus
|
||||
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_port_handler():
|
||||
if sys.platform == "darwin":
|
||||
with patch.object(scs, "PortHandler", MockPortHandler):
|
||||
yield
|
||||
else:
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_motors() -> Generator[MockMotors, None, None]:
|
||||
motors = MockMotors()
|
||||
motors.open()
|
||||
yield motors
|
||||
motors.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dummy_motors() -> dict[str, Motor]:
|
||||
return {
|
||||
"dummy_1": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100),
|
||||
"wrist_roll": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100),
|
||||
"dummy_3": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_broadcast_ping():
|
||||
with patch.object(FeetechMotorsBus, "broadcast_ping", return_value={1: 777, 2: 777, 3: 777}):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
|
||||
def test_autouse_patch():
|
||||
"""Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
|
||||
assert scs.PortHandler is MockPortHandler
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"motor_names, read_values",
|
||||
[
|
||||
(
|
||||
["dummy_1"],
|
||||
[{"dummy_1": 3000}],
|
||||
),
|
||||
],
|
||||
ids=["two-motors"],
|
||||
)
|
||||
def test_find_offset(mock_motors, dummy_motors, motor_names, read_values):
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
with (
|
||||
patch("builtins.input", return_value=""),
|
||||
patch("lerobot.common.motors.calibration.set_offset") as mock_set_offset,
|
||||
):
|
||||
motors_bus.sync_read = Mock(side_effect=[{"dummy_1": 3000}])
|
||||
motors_bus.motor_names = motor_names
|
||||
motors_bus.write = Mock(return_value=None)
|
||||
|
||||
find_offset(motors_bus)
|
||||
# Compute the expected offset: 3000 - 2047 = 953.
|
||||
expected_calls = [call(motors_bus, 953, "dummy_1")]
|
||||
mock_set_offset.assert_has_calls(expected_calls, any_order=False)
|
||||
|
||||
|
||||
def test_find_min_max(mock_motors, dummy_motors):
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
motors_bus.motor_names = list(dummy_motors.keys())
|
||||
read_side_effect = [
|
||||
{"dummy_1": 10, "wrist_roll": 20, "dummy_3": 30}, # For first sync_read call.
|
||||
{"dummy_1": 4000, "wrist_roll": 2000, "dummy_3": 100}, # For second sync_read call.
|
||||
{"dummy_1": 100, "wrist_roll": 4050, "dummy_3": 2010}, # For third sync_read call.
|
||||
]
|
||||
motors_bus.sync_read = Mock(side_effect=read_side_effect)
|
||||
|
||||
select_returns = [
|
||||
([], [], []), # First iteration: no input.
|
||||
([], [], []), # Second iteration.
|
||||
([sys.stdin], [], []), # Third iteration: simulate pressing ENTER.
|
||||
]
|
||||
with (
|
||||
patch("lerobot.common.motors.calibration.set_min_max") as mock_set_min_max,
|
||||
patch("lerobot.common.motors.calibration.select.select", side_effect=select_returns),
|
||||
patch("sys.stdin.readline", return_value="\n"),
|
||||
):
|
||||
find_min_max(motors_bus)
|
||||
|
||||
mock_set_min_max.assert_any_call(motors_bus, 10, 4000, "dummy_1")
|
||||
mock_set_min_max.assert_any_call(motors_bus, 0, 4095, "wrist_roll") # wrist_roll is forced to [0,4095]
|
||||
mock_set_min_max.assert_any_call(motors_bus, 30, 2010, "dummy_3")
|
||||
assert mock_set_min_max.call_count == 3
|
||||
|
||||
|
||||
def test_set_offset_clamping(mock_motors, dummy_motors):
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
motors_bus.sync_read = Mock(return_value={"dummy_1": 2047})
|
||||
motors_bus.write = Mock()
|
||||
# A very large offset should be clamped to +2047.
|
||||
set_offset(motors_bus, 9999, "dummy_1")
|
||||
motors_bus.write.assert_any_call("Offset", "dummy_1", 2047, raw_value=True)
|
||||
|
||||
|
||||
def test_set_min_max(mock_motors, dummy_motors):
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
def _sync_read_side_effect(data_name, motors, *, raw_values=False):
|
||||
if data_name == "Min_Angle_Limit":
|
||||
return {"dummy_1": 100}
|
||||
elif data_name == "Max_Angle_Limit":
|
||||
return {"dummy_1": 3000}
|
||||
return {}
|
||||
|
||||
motors_bus.sync_read = Mock(side_effect=_sync_read_side_effect)
|
||||
|
||||
motors_bus.write = Mock()
|
||||
set_min_max(motors_bus, 100, 3000, "dummy_1")
|
||||
motors_bus.write.assert_any_call("Min_Angle_Limit", "dummy_1", 100, raw_value=True)
|
||||
motors_bus.write.assert_any_call("Max_Angle_Limit", "dummy_1", 3000, raw_value=True)
|
||||
Reference in New Issue
Block a user