Implement SO-100 leader calibration

This commit is contained in:
Simon Alibert
2025-03-31 18:16:42 +02:00
parent 8cc0232e73
commit 201311503f
2 changed files with 66 additions and 54 deletions

View File

@@ -14,19 +14,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import time import time
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorNormMode from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration
from lerobot.common.motors.feetech import ( from lerobot.common.motors.feetech import (
FeetechMotorsBus, FeetechMotorsBus,
OperatingMode,
TorqueMode,
) )
from lerobot.common.motors.feetech.feetech import TorqueMode
from ..teleoperator import Teleoperator from ..teleoperator import Teleoperator
from .configuration_so100 import SO100TeleopConfig from .configuration_so100 import SO100TeleopConfig
@@ -71,19 +68,6 @@ class SO100Teleop(Teleoperator):
def feedback_feature(self) -> dict: def feedback_feature(self) -> dict:
return {} return {}
def configure(self) -> None:
if not self.calibration_fpath.exists():
print("Calibration file not found. Running calibration.")
self.calibrate()
else:
print("Calibration file found. Loading calibration data.")
set_calibration(self.arm, self.calibration_fpath)
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
@property @property
def is_connected(self) -> bool: def is_connected(self) -> bool:
return self.arm.is_connected return self.arm.is_connected
@@ -96,46 +80,53 @@ class SO100Teleop(Teleoperator):
logging.info("Connecting arm.") logging.info("Connecting arm.")
self.arm.connect() self.arm.connect()
if not self.is_calibrated:
self.calibrate()
self.configure() self.configure()
# Check arm can be read def configure(self) -> None:
self.arm.sync_read("Present_Position") # We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
def calibrate(self) -> None:
print(f"\nRunning calibration of {self.name} teleop")
for name in self.arm.names: for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
offsets = find_offset(self.arm) @property
min_max = find_min_max(self.arm) def is_calibrated(self) -> bool:
motors_calibration = self.arm.get_calibration()
return motors_calibration == self.calibration
calibration = {} def calibrate(self) -> None:
print(f"\nRunning calibration of {self.id} SO-100 teleop")
for name in self.arm.names: for name in self.arm.names:
motor_id = self.arm.motors[name].id self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
calibration[str(motor_id)] = { self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value)
"name": name,
"homing_offset": offsets.get(name, 0),
"drive_mode": 0,
"min": min_max[name]["min"],
"max": min_max[name]["max"],
}
with open(self.calibration_fpath, "w") as f: input("Move robot to the middle of its range of motion and press ENTER....")
json.dump(calibration, f, indent=4) homing_offsets = self.arm.set_half_turn_homings()
print(
"Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion."
)
print("Recording positions. Press ENTER to stop...")
auto_range_motors = [name for name in self.arm.names if name != "wrist_roll"]
ranges = self.arm.register_ranges_of_motion(auto_range_motors)
ranges["wrist_roll"] = {"min": 0, "max": 4095}
self.calibration = {}
for name, motor in self.arm.motors.items():
self.calibration[name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=homing_offsets[name],
range_min=ranges[name]["min"],
range_max=ranges[name]["max"],
)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath) print("Calibration saved to", self.calibration_fpath)
def set_calibration(self) -> None: def get_action(self) -> dict[str, float]:
"""After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if not self.calibration_fpath.exists():
logging.error("Calibration file not found. Please run calibration first")
raise FileNotFoundError(self.calibration_fpath)
self.arm.set_calibration(self.calibration_fpath)
def get_action(self) -> np.ndarray:
"""The returned action does not have a batch dimension.""" """The returned action does not have a batch dimension."""
# Read arm position # Read arm position
before_read_t = time.perf_counter() before_read_t = time.perf_counter()
@@ -144,7 +135,7 @@ class SO100Teleop(Teleoperator):
return action return action
def send_feedback(self, feedback: np.ndarray) -> None: def send_feedback(self, feedback: dict[str, float]) -> None:
# TODO(rcadene, aliberts): Implement force feedback # TODO(rcadene, aliberts): Implement force feedback
pass pass

View File

@@ -1,7 +1,11 @@
import abc import abc
from pathlib import Path
from typing import Any from typing import Any
import draccus
from lerobot.common.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS from lerobot.common.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
from lerobot.common.motors.motors_bus import MotorCalibration
from .config import TeleoperatorConfig from .config import TeleoperatorConfig
@@ -22,6 +26,9 @@ class Teleoperator(abc.ABC):
) )
self.calibration_dir.mkdir(parents=True, exist_ok=True) self.calibration_dir.mkdir(parents=True, exist_ok=True)
self.calibration_fpath = self.calibration_dir / f"{self.id}.json" self.calibration_fpath = self.calibration_dir / f"{self.id}.json"
self.calibration: dict[str, MotorCalibration] = {}
if self.calibration_fpath.is_file():
self._load_calibration()
@abc.abstractproperty @abc.abstractproperty
def action_feature(self) -> dict: def action_feature(self) -> dict:
@@ -40,11 +47,29 @@ class Teleoperator(abc.ABC):
"""Connects to the teleoperator.""" """Connects to the teleoperator."""
pass pass
@abc.abstractmethod
def configure(self) -> None:
pass
@abc.abstractproperty
def is_calibrated(self) -> bool:
pass
@abc.abstractmethod @abc.abstractmethod
def calibrate(self) -> None: def calibrate(self) -> None:
"""Calibrates the teleoperator.""" """Calibrates the teleoperator."""
pass pass
def _load_calibration(self, fpath: Path | None = None) -> None:
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath) as f, draccus.config_type("json"):
self.calibration = draccus.load(dict[str, MotorCalibration], f)
def _save_calibration(self, fpath: Path | None = None) -> None:
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath, "w") as f, draccus.config_type("json"):
draccus.dump(self.calibration, f, indent=4)
@abc.abstractmethod @abc.abstractmethod
def get_action(self) -> dict[str, Any]: def get_action(self) -> dict[str, Any]:
"""Gets the action to send to a teleoperator.""" """Gets the action to send to a teleoperator."""
@@ -59,7 +84,3 @@ class Teleoperator(abc.ABC):
def disconnect(self) -> None: def disconnect(self) -> None:
"""Disconnects from the teleoperator.""" """Disconnects from the teleoperator."""
pass pass
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()