Add calibrate

This commit is contained in:
Simon Alibert
2025-05-08 18:27:19 +02:00
parent dd3e305164
commit 64303781c2
14 changed files with 91 additions and 20 deletions

60
lerobot/calibrate.py Normal file
View File

@@ -0,0 +1,60 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from dataclasses import asdict, dataclass
from pprint import pformat
import draccus
from lerobot.common.cameras import intel, opencv # noqa: F401
from lerobot.common.robots import ( # noqa: F401
Robot,
RobotConfig,
koch_follower,
make_robot_from_config,
so100_follower,
)
from lerobot.common.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
make_teleoperator_from_config,
)
from lerobot.common.utils.utils import init_logging
from .common.teleoperators import koch_leader, so100_leader # noqa: F401
@dataclass
class CalibrateConfig:
device: RobotConfig | TeleoperatorConfig
@draccus.wrap()
def calibrate(cfg: CalibrateConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
if isinstance(cfg.device, RobotConfig):
device = make_robot_from_config(cfg.device)
elif isinstance(cfg.device, TeleoperatorConfig):
device = make_teleoperator_from_config(cfg.device)
device.connect(calibrate=False)
device.calibrate()
device.disconnect()
if __name__ == "__main__":
calibrate()

View File

@@ -85,7 +85,7 @@ class KochFollower(Robot):
# TODO(aliberts): add cam.is_connected for cam in self.cameras # TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected return self.arm.is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
""" """
We assume that at connection time, arm is in a rest position, We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration. and torque can be safely disabled to run calibration.
@@ -94,7 +94,7 @@ class KochFollower(Robot):
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect() self.arm.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
for cam in self.cameras.values(): for cam in self.cameras.values():

View File

@@ -104,12 +104,12 @@ class LeKiwi(Robot):
# TODO(aliberts): add cam.is_connected for cam in self.cameras # TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.bus.is_connected return self.bus.is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect() self.bus.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
for cam in self.cameras.values(): for cam in self.cameras.values():

View File

@@ -82,7 +82,7 @@ class MossRobot(Robot):
# TODO(aliberts): add cam.is_connected for cam in self.cameras # TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected return self.arm.is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
""" """
We assume that at connection time, arm is in a rest position, We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration. and torque can be safely disabled to run calibration.
@@ -91,7 +91,7 @@ class MossRobot(Robot):
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect() self.arm.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
# Connect the cameras # Connect the cameras

View File

@@ -48,7 +48,7 @@ class Robot(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
"""Connects to the robot.""" """Connects to the robot."""
pass pass

View File

@@ -82,7 +82,7 @@ class SO100Follower(Robot):
# TODO(aliberts): add cam.is_connected for cam in self.cameras # TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected return self.arm.is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
""" """
We assume that at connection time, arm is in a rest position, We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration. and torque can be safely disabled to run calibration.
@@ -91,7 +91,7 @@ class SO100Follower(Robot):
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect() self.arm.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
# Connect the cameras # Connect the cameras

View File

@@ -78,7 +78,7 @@ class ViperX(Robot):
# TODO(aliberts): add cam.is_connected for cam in self.cameras # TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected return self.arm.is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
""" """
We assume that at connection time, arm is in a rest position, We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration. and torque can be safely disabled to run calibration.
@@ -87,7 +87,7 @@ class ViperX(Robot):
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect() self.arm.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
for cam in self.cameras.values(): for cam in self.cameras.values():

View File

@@ -69,12 +69,12 @@ class KochLeader(Teleoperator):
def is_connected(self) -> bool: def is_connected(self) -> bool:
return self.arm.is_connected return self.arm.is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect() self.arm.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
self.configure() self.configure()

View File

@@ -66,12 +66,12 @@ class SO100Leader(Teleoperator):
def is_connected(self) -> bool: def is_connected(self) -> bool:
return self.arm.is_connected return self.arm.is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect() self.arm.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
self.configure() self.configure()

View File

@@ -46,7 +46,7 @@ class Teleoperator(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
"""Connects to the teleoperator.""" """Connects to the teleoperator."""
pass pass

View File

@@ -69,12 +69,12 @@ class WidowX(Teleoperator):
def is_connected(self) -> bool: def is_connected(self) -> bool:
return self.arm.is_connected return self.arm.is_connected
def connect(self): def connect(self, calibrate: bool = True):
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect() self.arm.connect()
if not self.is_calibrated: if not self.is_calibrated and calibrate:
self.calibrate() self.calibrate()
self.configure() self.configure()

View File

@@ -67,11 +67,13 @@ class MockRobot(Robot):
def is_connected(self) -> bool: def is_connected(self) -> bool:
return self._is_connected return self._is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self._is_connected = True self._is_connected = True
if calibrate:
self.calibrate()
@property @property
def is_calibrated(self) -> bool: def is_calibrated(self) -> bool:

View File

@@ -51,11 +51,13 @@ class MockTeleop(Teleoperator):
def is_connected(self) -> bool: def is_connected(self) -> bool:
return self._is_connected return self._is_connected
def connect(self) -> None: def connect(self, calibrate: bool = True) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected") raise DeviceAlreadyConnectedError(f"{self} already connected")
self._is_connected = True self._is_connected = True
if calibrate:
self.calibrate()
@property @property
def is_calibrated(self) -> bool: def is_calibrated(self) -> bool:

View File

@@ -1,5 +1,6 @@
import time import time
from lerobot.calibrate import CalibrateConfig, calibrate
from lerobot.record import DatasetRecordConfig, RecordConfig, record from lerobot.record import DatasetRecordConfig, RecordConfig, record
from lerobot.replay import DatasetReplayConfig, ReplayConfig, replay from lerobot.replay import DatasetReplayConfig, ReplayConfig, replay
from lerobot.teleoperate import TeleoperateConfig, teleoperate from lerobot.teleoperate import TeleoperateConfig, teleoperate
@@ -8,6 +9,12 @@ from tests.mocks.mock_robot import MockRobotConfig
from tests.mocks.mock_teleop import MockTeleopConfig from tests.mocks.mock_teleop import MockTeleopConfig
def test_calibrate():
robot_cfg = MockRobotConfig()
cfg = CalibrateConfig(device=robot_cfg)
calibrate(cfg)
def test_teleoperate(): def test_teleoperate():
robot_cfg = MockRobotConfig() robot_cfg = MockRobotConfig()
teleop_cfg = MockTeleopConfig() teleop_cfg = MockTeleopConfig()