diff --git a/lerobot/common/robots/robot.py b/lerobot/common/robots/robot.py index a263a043e..7ae67ba53 100644 --- a/lerobot/common/robots/robot.py +++ b/lerobot/common/robots/robot.py @@ -1,7 +1,11 @@ import abc +from pathlib import Path from typing import Any +import draccus + from lerobot.common.constants import HF_LEROBOT_CALIBRATION, ROBOTS +from lerobot.common.motors import MotorCalibration from .config import RobotConfig @@ -23,6 +27,9 @@ class Robot(abc.ABC): ) self.calibration_dir.mkdir(parents=True, exist_ok=True) self.calibration_fpath = self.calibration_dir / f"{self.id}.json" + self.calibration: dict[str, MotorCalibration] = {} + if self.calibration_fpath.is_file(): + self._load_calibration() # TODO(aliberts): create a proper Feature class for this that links with datasets @abc.abstractproperty @@ -46,11 +53,29 @@ class Robot(abc.ABC): """Connects to the robot.""" pass + @abc.abstractmethod + def configure(self) -> None: + pass + + @abc.abstractproperty + def is_calibrated(self) -> bool: + pass + @abc.abstractmethod def calibrate(self) -> None: """Calibrates the robot.""" pass + def _load_calibration(self, fpath: Path | None = None) -> None: + fpath = self.calibration_fpath if fpath is None else fpath + with open(fpath) as f, draccus.config_type("json"): + self.calibration = draccus.load(dict[str, MotorCalibration], f) + + def _save_calibration(self, fpath: Path | None = None) -> None: + fpath = self.calibration_fpath if fpath is None else fpath + with open(fpath, "w") as f, draccus.config_type("json"): + draccus.dump(self.calibration, f, indent=4) + @abc.abstractmethod def get_observation(self) -> dict[str, Any]: """Gets observation from the robot.""" @@ -65,7 +90,3 @@ class Robot(abc.ABC): def disconnect(self) -> None: """Disconnects from the robot.""" pass - - def __del__(self): - if getattr(self, "is_connected", False): - self.disconnect() diff --git a/lerobot/common/robots/so100/robot_so100.py b/lerobot/common/robots/so100/robot_so100.py index 00432d4b0..78273723b 100644 --- a/lerobot/common/robots/so100/robot_so100.py +++ b/lerobot/common/robots/so100/robot_so100.py @@ -14,17 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import time - -import numpy as np +from typing import Any from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import Motor, MotorNormMode -from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, OperatingMode, @@ -86,35 +83,6 @@ class SO100Robot(Robot): } return cam_ft - def configure(self) -> None: - for name in self.arm.names: - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) - self.arm.write("Mode", name, OperatingMode.POSITION.value) - - # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.arm.write("P_Coefficient", name, 16) - # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.arm.write("I_Coefficient", name, 0) - self.arm.write("D_Coefficient", name, 32) - # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of - # the motors. Note: this configuration is not in the official STS3215 Memory Table - self.arm.write("Maximum_Acceleration", name, 254) - self.arm.write("Acceleration", name, 254) - - if not self.calibration_fpath.exists(): - print("Calibration file not found. Running calibration.") - self.calibrate() - else: - print("Calibration file found. Loading calibration data.") - set_calibration(self.arm, self.calibration_fpath) - - for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) - - logging.info("Torque activated.") - @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras @@ -128,37 +96,73 @@ class SO100Robot(Robot): logging.info("Connecting arm.") self.arm.connect() - self.configure() + if not self.is_calibrated: + self.calibrate() - # Check arm can be read - self.arm.sync_read("Present_Position") + self.configure() # Connect the cameras for cam in self.cameras.values(): cam.connect() - def calibrate(self) -> None: - print(f"\nRunning calibration of {self.name} robot") - - offsets = find_offset(self.arm) - min_max = find_min_max(self.arm) - - calibration = {} + def configure(self) -> None: for name in self.arm.names: - motor_id = self.arm.motors[name].id - calibration[str(motor_id)] = { - "name": name, - "homing_offset": offsets.get(name, 0), - "drive_mode": 0, - "min": min_max[name]["min"], - "max": min_max[name]["max"], - } + # We assume that at connection time, arm is in a rest position, + # and torque can be safely disabled to run calibration. + self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f, indent=4) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.arm.write("P_Coefficient", name, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.arm.write("I_Coefficient", name, 0) + self.arm.write("D_Coefficient", name, 32) + # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of + # the motors. Note: this configuration is not in the official STS3215 Memory Table + self.arm.write("Maximum_Acceleration", name, 254) + self.arm.write("Acceleration", name, 254) + + for name in self.arm.names: + self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) + + logging.info("Torque activated.") + + @property + def is_calibrated(self) -> bool: + motors_calibration = self.arm.get_calibration() + return motors_calibration == self.calibration + + def calibrate(self) -> None: + print(f"\nRunning calibration of {self.id} SO-100 robot") + for name in self.arm.names: + self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) + + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + print( + "Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion." + ) + print("Recording positions. Press ENTER to stop...") + auto_range_motors = [name for name in self.arm.names if name != "wrist_roll"] + ranges = self.arm.register_ranges_of_motion(auto_range_motors) + ranges["wrist_roll"] = {"min": 0, "max": 4095} + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=ranges[name]["min"], + range_max=ranges[name]["max"], + ) + + self._save_calibration() print("Calibration saved to", self.calibration_fpath) - def get_observation(self) -> dict[str, np.ndarray]: + def get_observation(self) -> dict[str, Any]: """The returned observations do not have a batch dimension.""" if not self.is_connected: raise DeviceNotConnectedError( @@ -181,21 +185,18 @@ class SO100Robot(Robot): return obs_dict - def send_action(self, action: np.ndarray) -> np.ndarray: + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: """Command arm to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter `max_relative_target`. In this case, the action sent differs from original action. Thus, this function always returns the action actually sent. - Args: - action (np.ndarray): array containing the goal positions for the motors. - Raises: RobotDeviceNotConnectedError: if robot is not connected. Returns: - np.ndarray: the action sent to the motors, potentially clipped. + the action sent to the motors, potentially clipped. """ if not self.is_connected: raise DeviceNotConnectedError( @@ -211,7 +212,7 @@ class SO100Robot(Robot): goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.sync_write("Goal_Position", goal_pos.astype(np.int32)) + self.arm.sync_write("Goal_Position", goal_pos) return goal_pos