Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr

This commit is contained in:
Simon Alibert
2025-05-13 18:50:06 +02:00
35 changed files with 683 additions and 856 deletions

View File

@@ -182,7 +182,6 @@ available_robots = [
"aloha",
"so100",
"so101",
"moss",
]
# lists all available cameras from `lerobot/common/robot_devices/cameras`

View File

@@ -19,9 +19,9 @@ Example:
```shell
python -m lerobot.calibrate \
--device.type=so100_leader \
--device.port=/dev/tty.usbmodem58760431551 \
--device.id=blue
--teleop.type=so100_leader \
--teleop.port=/dev/tty.usbmodem58760431551 \
--teleop.id=blue
```
"""
@@ -51,7 +51,14 @@ from .common.teleoperators import koch_leader, so100_leader # noqa: F401
@dataclass
class CalibrateConfig:
device: RobotConfig | TeleoperatorConfig
teleop: TeleoperatorConfig | None = None
robot: RobotConfig | None = None
def __post_init__(self):
if bool(self.teleop) == bool(self.robot):
raise ValueError("Choose either a teleop or a robot.")
self.device = self.robot if self.robot else self.teleop
@draccus.wrap()

View File

@@ -1,3 +1,18 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class DeviceNotConnectedError(ConnectionError):
"""Exception raised when the device is not connected."""

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(Steven): Consider doing the following:
# from enum import Enum
# class MyControlTableKey(Enum):

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FIRMWARE_MAJOR_VERSION = (0, 1)
FIRMWARE_MINOR_VERSION = (1, 1)
MODEL_NUMBER = (3, 2)

View File

@@ -213,21 +213,19 @@ class GroupSyncWrite(Protocol):
class MotorsBus(abc.ABC):
"""The main LeRobot class for implementing motors buses.
"""
A MotorsBus allows to efficiently read and write to the attached motors.
It represents several motors daisy-chained together and connected through a serial port.
There are currently two implementations of this abstract class:
- DynamixelMotorsBus
- FeetechMotorsBus
Note: This class may evolve in the future should we add support for other manufacturers SDKs.
A MotorsBus allows to efficiently read and write to the attached motors.
It represents several motors daisy-chained together and connected through a serial port.
Note: This class may evolve in the future should we add support for other types of bus.
A MotorsBus subclass instance requires a port (e.g. `FeetechMotorsBus(port="/dev/tty.usbmodem575E0031751"`)).
To find the port, you can run our utility script:
```bash
python lerobot/scripts/find_motors_bus_port.py
python -m lerobot.find_port.py
>>> Finding all available ports for the MotorsBus.
>>> ['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
>>> Remove the usb cable from your MotorsBus and press Enter when done.
@@ -237,20 +235,20 @@ class MotorsBus(abc.ABC):
Example of usage for 1 Feetech sts3215 motor connected to the bus:
```python
motors_bus = FeetechMotorsBus(
bus = FeetechMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={"gripper": (6, "sts3215")},
motors={"my_motor": (1, "sts3215")},
)
motors_bus.connect()
bus.connect()
position = motors_bus.read("Present_Position")
position = bus.read("Present_Position", normalize=False)
# Move from a few motor steps as an example
few_steps = 30
motors_bus.write("Goal_Position", position + few_steps)
bus.write("Goal_Position", position + few_steps, normalize=False)
# When done, properly disconnect the port using
motors_bus.disconnect()
bus.disconnect()
```
"""
@@ -403,9 +401,20 @@ class MotorsBus(abc.ABC):
@property
def is_connected(self) -> bool:
"""bool: `True` if the underlying serial port is open."""
return self.port_handler.is_open
def connect(self, handshake: bool = True) -> None:
"""Open the serial port and initialise communication.
Args:
handshake (bool, optional): Pings every expected motor and performs additional
integrity checks specific to the implementation. Defaults to `True`.
Raises:
DeviceAlreadyConnectedError: The port is already open.
ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(
f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice."
@@ -431,8 +440,39 @@ class MotorsBus(abc.ABC):
def _handshake(self) -> None:
pass
def disconnect(self, disable_torque: bool = True) -> None:
"""Close the serial port (optionally disabling torque first).
Args:
disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before
closing the port. This can prevent damaging motors if they are left applying resisting torque
after disconnect.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
)
if disable_torque:
self.port_handler.clearPort()
self.port_handler.is_using = False
self.disable_torque(num_retry=5)
self.port_handler.closePort()
logger.debug(f"{self.__class__.__name__} disconnected.")
@classmethod
def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]:
"""Probe *port* at every supported baud-rate and list responding IDs.
Args:
port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``).
*args, **kwargs: Forwarded to the subclass constructor.
Returns:
dict[int, list[int]]: Mapping *baud-rate → list of motor IDs*
for every baud-rate that produced at least one response.
"""
bus = cls(port, {}, *args, **kwargs)
bus._connect(handshake=False)
baudrate_ids = {}
@@ -448,6 +488,22 @@ class MotorsBus(abc.ABC):
def setup_motor(
self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None
) -> None:
"""Assign the correct ID and baud-rate to a single motor.
This helper temporarily switches to the motor's current settings, disables torque, sets the desired
ID, and finally programs the bus' default baud-rate.
Args:
motor (str): Key of the motor in :pyattr:`motors`.
initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided).
Defaults to None.
initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None.
Raises:
RuntimeError: The motor could not be found or its model number
does not match the expected one.
ConnectionError: Communication with the motor failed.
"""
if not self.is_connected:
self._connect(handshake=False)
@@ -479,10 +535,25 @@ class MotorsBus(abc.ABC):
@abc.abstractmethod
def configure_motors(self) -> None:
"""Write implementation-specific recommended settings to every motor.
Typical changes include shortening the return delay, increasing
acceleration limits or disabling safety locks.
"""
pass
@abc.abstractmethod
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
"""Disable torque on selected motors.
Disabling Torque allows to write to the motors' permanent memory area (EPROM/EEPROM).
Args:
motors (int | str | list[str] | None, optional): Target motors. Accepts a motor name, an ID, a
list of names or `None` to affect every registered motor. Defaults to `None`.
num_retry (int, optional): Number of additional retry attempts on communication failure.
Defaults to 0.
"""
pass
@abc.abstractmethod
@@ -491,10 +562,26 @@ class MotorsBus(abc.ABC):
@abc.abstractmethod
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
"""Enable torque on selected motors.
Args:
motor (int): Same semantics as :pymeth:`disable_torque`. Defaults to `None`.
model (str): _description_
num_retry (int, optional): _description_. Defaults to 0.
"""
pass
@contextmanager
def torque_disabled(self):
"""Context-manager that guarantees torque is re-enabled.
This helper is useful to temporarily disable torque when configuring motors.
Examples:
>>> with bus.torque_disabled():
... # Safe operations here
... pass
"""
self.disable_torque()
try:
yield
@@ -502,34 +589,74 @@ class MotorsBus(abc.ABC):
self.enable_torque()
def set_timeout(self, timeout_ms: int | None = None):
"""Change the packet timeout used by the SDK.
Args:
timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls
back to :pyattr:`default_timeout`.
"""
timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
self.port_handler.setPacketTimeoutMillis(timeout_ms)
def get_baudrate(self) -> int:
"""Return the current baud-rate configured on the port.
Returns:
int: Baud-rate in bits / second.
"""
return self.port_handler.getBaudRate()
def set_baudrate(self, baudrate: int) -> None:
"""Set a new UART baud-rate on the port.
Args:
baudrate (int): Desired baud-rate in bits / second.
Raises:
RuntimeError: The SDK failed to apply the change.
"""
present_bus_baudrate = self.port_handler.getBaudRate()
if present_bus_baudrate != baudrate:
logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
self.port_handler.setBaudRate(baudrate)
if self.port_handler.getBaudRate() != baudrate:
raise OSError("Failed to write bus baud rate.")
raise RuntimeError("Failed to write bus baud rate.")
@property
def is_calibrated(self) -> bool:
"""bool: ``True`` if the cached calibration matches the motors."""
return self.calibration == self.read_calibration()
@abc.abstractmethod
def read_calibration(self) -> dict[str, MotorCalibration]:
"""Read calibration parameters from the motors.
Returns:
dict[str, MotorCalibration]: Mapping *motor name → calibration*.
"""
pass
@abc.abstractmethod
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
"""Write calibration parameters to the motors and cache them.
Args:
calibration_dict (dict[str, MotorCalibration]): Calibration obtained from
:pymeth:`read_calibration` or crafted by the user.
"""
pass
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
"""Restore factory calibration for the selected motors.
Homing offset is set to ``0`` and min/max position limits are set to the full usable range.
The in-memory :pyattr:`calibration` is cleared.
Args:
motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default)
resets every motor.
"""
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str, int)):
@@ -547,24 +674,16 @@ class MotorsBus(abc.ABC):
self.calibration = {}
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
"""
This assumes motors present positions are roughly in the middle of their desired range
"""Centre each motor range around its current position.
Step 1: Set homing and min max to 0
The function computes and writes a homing offset such that the present position becomes exactly one
half-turn (e.g. `2047` on a 12-bit encoder).
Step 2: Read Present_Position which will be Actual_Position since
Present_Position = Actual_Position ± Homing_Offset (1)
and Homing_Offset = 0 from step 1
Args:
motors (NameOrID | list[NameOrID] | None, optional): Motors to adjust. Defaults to all motors (`None`).
Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of 1
revolution. For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the
current Present_Position to be 2047.
In that example:
Present_Position = 2047 (2)
Actual_Position = X (read in step 2)
from (1) and (2):
=> Homing_Offset = ±(X - 2048)
Returns:
dict[NameOrID, Value]: Mapping *motor → written homing offset*.
"""
if motors is None:
motors = list(self.motors)
@@ -588,10 +707,19 @@ class MotorsBus(abc.ABC):
def record_ranges_of_motion(
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
"""
This assumes that the homing offsets have been set such that all possible values in the range of
motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should
typically be called prior to this.
"""Interactively record the min/max encoder values of each motor.
Move the joints by hand (with torque disabled) while the method streams live positions. Press
:kbd:`Enter` to finish.
Args:
motors (NameOrID | list[NameOrID] | None, optional): Motors to record.
Defaults to every motor (`None`).
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Returns:
tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the
extreme values observed for each motor.
"""
if motors is None:
motors = list(self.motors)
@@ -621,6 +749,7 @@ class MotorsBus(abc.ABC):
# Move cursor up to overwrite the previous output
move_cursor_up(len(motors) + 3)
# TODO(Steven, aliberts): add check to ensure mins and maxes are different
return mins, maxes
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
@@ -702,6 +831,17 @@ class MotorsBus(abc.ABC):
pass
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
"""Ping a single motor and return its model number.
Args:
motor (NameOrID): Target motor (name or ID).
num_retry (int, optional): Extra attempts before giving up. Defaults to `0`.
raise_on_error (bool, optional): If `True` communication errors raise exceptions instead of
returning `None`. Defaults to `False`.
Returns:
int | None: Motor model number or `None` on failure.
"""
id_ = self._get_motor_id(motor)
for n_try in range(1 + num_retry):
model_number, comm, error = self.packet_handler.ping(self.port_handler, id_)
@@ -724,6 +864,16 @@ class MotorsBus(abc.ABC):
@abc.abstractmethod
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
"""Ping every ID on the bus using the broadcast address.
Args:
num_retry (int, optional): Retry attempts. Defaults to `0`.
raise_on_error (bool, optional): When `True` failures raise an exception instead of returning
`None`. Defaults to `False`.
Returns:
dict[int, int] | None: Mapping *id → model number* or `None` if the call failed.
"""
pass
def read(
@@ -734,6 +884,18 @@ class MotorsBus(abc.ABC):
normalize: bool = True,
num_retry: int = 0,
) -> Value:
"""Read a register from a motor.
Args:
data_name (str): Control-table key (e.g. `"Present_Position"`).
motor (str): Motor name.
normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as
defined by the calibration.
num_retry (int, optional): Retry attempts. Defaults to `0`.
Returns:
Value: Raw or normalised value depending on *normalize*.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
@@ -791,6 +953,21 @@ class MotorsBus(abc.ABC):
def write(
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
) -> None:
"""Write a value to a single motor's register.
Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which
provides a guarantee that the value was written to the register successfully. In consequence, it is
slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring
motors.
Args:
data_name (str): Register name.
motor (str): Motor name.
value (Value): Value to write. If *normalize* is `True` the value is first converted to raw
units.
normalize (bool, optional): Enable or disable normalisation. Defaults to `True`.
num_retry (int, optional): Retry attempts. Defaults to `0`.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
@@ -844,6 +1021,17 @@ class MotorsBus(abc.ABC):
normalize: bool = True,
num_retry: int = 0,
) -> dict[str, Value]:
"""Read the same register from several motors at once.
Args:
data_name (str): Register name.
motors (str | list[str] | None, optional): Motors to query. `None` (default) reads every motor.
normalize (bool, optional): Normalisation flag. Defaults to `True`.
num_retry (int, optional): Retry attempts. Defaults to `0`.
Returns:
dict[str, Value]: Mapping *motor name → value*.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
@@ -928,6 +1116,19 @@ class MotorsBus(abc.ABC):
normalize: bool = True,
num_retry: int = 0,
) -> None:
"""Write the same register on multiple motors.
Contrary to :pymeth:`write`, this *does not* expects a response status packet emitted by the motor, which
can allow for lost packets. It is faster than :pymeth:`write` and should typically be used when
frequency matters and losing some packets is acceptable (e.g. teleoperation loops).
Args:
data_name (str): Register name.
values (Value | dict[str, Value]): Either a single value (applied to every motor) or a mapping
*motor name → value*.
normalize (bool, optional): If `True` (default) convert values from the user range to raw units.
num_retry (int, optional): Retry attempts. Defaults to `0`.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
@@ -980,17 +1181,3 @@ class MotorsBus(abc.ABC):
for id_, value in ids_values.items():
data = self._serialize_data(value, length)
self.sync_writer.addParam(id_, data)
def disconnect(self, disable_torque: bool = True) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
)
if disable_torque:
self.port_handler.clearPort()
self.port_handler.is_using = False
self.disable_torque(num_retry=5)
self.port_handler.closePort()
logger.debug(f"{self.__class__.__name__} disconnected.")

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from dataclasses import dataclass
from pathlib import Path

View File

@@ -90,8 +90,7 @@ Connect the usb cable from your computer and the 5V power supply to the leader a
```bash
python -m lerobot.setup_motors \
--device.type=so100_leader \
--device.port=/dev/tty.usbmodem575E0031751 \ # <- paste here the port found at previous step
--device.id=my_awesome_leader_arm # <- give it a nice, unique name
--device.port=/dev/tty.usbmodem575E0031751 # <- paste here the port found at previous step
```
Note that the command above is equivalent to running the following script:
@@ -103,7 +102,6 @@ Note that the command above is equivalent to running the following script:
config = KochLeaderConfig(
port="/dev/tty.usbmodem575E0031751",
id="my_awesome_leader_arm",
)
leader = KochLeader(config)
leader.setup_motors()
@@ -319,7 +317,7 @@ python lerobot/scripts/control_robot.py \
```
As you can see, it's almost the same command as previously used to record your training dataset. Two things changed:
1. There is an additional `--control.policy.path` argument which indicates the path to your policy checkpoint with (e.g. `outputs/train/eval_act_so100_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `${HF_USER}/act_so100_test`).
1. There is an additional `--control.policy.path` argument which indicates the path to your policy checkpoint with (e.g. `outputs/train/eval_act_so100_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `${HF_USER}/act_so100_test`).
2. The name of dataset begins by `eval` to reflect that you are running inference (e.g. `${HF_USER}/eval_act_so100_test`).
## K. More Information

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig

View File

@@ -48,7 +48,7 @@ class KochFollower(Robot):
def __init__(self, config: KochFollowerConfig):
super().__init__(config)
self.config = config
self.arm = DynamixelMotorsBus(
self.bus = DynamixelMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "xl430-w250", MotorNormMode.RANGE_M100_100),
@@ -64,7 +64,7 @@ class KochFollower(Robot):
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
@@ -83,7 +83,7 @@ class KochFollower(Robot):
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
@@ -93,7 +93,7 @@ class KochFollower(Robot):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -105,30 +105,30 @@ class KochFollower(Robot):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motors = ["shoulder_pan", "wrist_roll"]
unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors]
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
print(
f"Move all joints except {full_turn_motors} sequentially through their entire "
"ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for motor in full_turn_motors:
range_mins[motor] = 0
range_maxes[motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
@@ -137,19 +137,19 @@ class KochFollower(Robot):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
with self.arm.torque_disabled():
self.arm.configure_motors()
with self.bus.torque_disabled():
self.bus.configure_motors()
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling
# the arm, you could end up with a servo with a position 0 or 4095 at a crucial point
for motor in self.arm.motors:
for motor in self.bus.motors:
if motor != "gripper":
self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
# Use 'position control current based' for gripper to be limited by the limit of the current. For
# the follower gripper, it means it can grasp an object without forcing too much even tho, its
@@ -157,19 +157,19 @@ class KochFollower(Robot):
# For the leader gripper, it means we can use it as a physical trigger, since we can force with
# our finger to make it move, and it will move back to its original target position when we
# release the force.
self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
# Set better PID values to close the gap between recorded states and actions
# TODO(rcadene): Implement an automatic procedure to set optimal PID values for each motor
self.arm.write("Position_P_Gain", "elbow_flex", 1500)
self.arm.write("Position_I_Gain", "elbow_flex", 0)
self.arm.write("Position_D_Gain", "elbow_flex", 600)
self.bus.write("Position_P_Gain", "elbow_flex", 1500)
self.bus.write("Position_I_Gain", "elbow_flex", 0)
self.bus.write("Position_D_Gain", "elbow_flex", 600)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
@@ -179,7 +179,7 @@ class KochFollower(Robot):
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
obs_dict[OBS_STATE] = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -214,19 +214,19 @@ class KochFollower(Robot):
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.sync_read("Present_Position")
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect(self.config.disable_torque_on_disconnect)
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()

View File

@@ -1,337 +0,0 @@
This tutorial explains how to use [Moss v1](https://github.com/jess-moss/moss-robot-arms) with LeRobot.
## Source the parts
Follow this [README](https://github.com/jess-moss/moss-robot-arms). It contains the bill of materials with link to source the parts, as well as the instructions to 3D print the parts and advice if it's your first time printing or if you don't own a 3D printer already.
**Important**: Before assembling, you will first need to configure your motors. To this end, we provide a nice script, so let's first install LeRobot. After configuration, we will also guide you through assembly.
## Install LeRobot
On your computer:
1. [Install Miniconda](https://docs.anaconda.com/miniconda/#quick-command-line-install):
```bash
mkdir -p ~/miniconda3
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm ~/miniconda3/miniconda.sh
~/miniconda3/bin/conda init bash
```
2. Restart shell or `source ~/.bashrc`
3. Create and activate a fresh conda environment for lerobot
```bash
conda create -y -n lerobot python=3.10 && conda activate lerobot
```
4. Clone LeRobot:
```bash
git clone https://github.com/huggingface/lerobot.git ~/lerobot
```
5. Install ffmpeg in your environment:
When using `miniconda`, install `ffmpeg` in your environment:
```bash
conda install ffmpeg -c conda-forge
```
6. Install LeRobot with dependencies for the feetech motors:
```bash
cd ~/lerobot && pip install -e ".[feetech]"
```
## Configure the motors
Follow step 1 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic) which illustrates the use of our scripts below.
**Find USB ports associated to your arms**
To find the correct ports for each arm, run the utility script twice:
```bash
python lerobot/scripts/find_motors_bus_port.py
```
Example output when identifying the leader arm's port (e.g., `/dev/tty.usbmodem575E0031751` on Mac, or possibly `/dev/ttyACM0` on Linux):
```
Finding all available ports for the MotorBus.
['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
Remove the usb cable from your DynamixelMotorsBus and press Enter when done.
[...Disconnect leader arm and press Enter...]
The port of this DynamixelMotorsBus is /dev/tty.usbmodem575E0031751
Reconnect the usb cable.
```
Example output when identifying the follower arm's port (e.g., `/dev/tty.usbmodem575E0032081`, or possibly `/dev/ttyACM1` on Linux):
```
Finding all available ports for the MotorBus.
['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
Remove the usb cable from your DynamixelMotorsBus and press Enter when done.
[...Disconnect follower arm and press Enter...]
The port of this DynamixelMotorsBus is /dev/tty.usbmodem575E0032081
Reconnect the usb cable.
```
Troubleshooting: On Linux, you might need to give access to the USB ports by running:
```bash
sudo chmod 666 /dev/ttyACM0
sudo chmod 666 /dev/ttyACM1
```
#### Update config file
IMPORTANTLY: Now that you have your ports, update the **port** default values of [`MossRobotConfig`](../lerobot/common/robot_devices/robots/configs.py). You will find something like:
```python
@RobotConfig.register_subclass("moss")
@dataclass
class MossRobotConfig(ManipulatorRobotConfig):
calibration_dir: str = ".cache/calibration/moss"
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
leader_arms: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {
"main": FeetechMotorsBusConfig(
port="/dev/tty.usbmodem58760431091", <-- UPDATE HERE
motors={
# name: (index, model)
"shoulder_pan": [1, "sts3215"],
"shoulder_lift": [2, "sts3215"],
"elbow_flex": [3, "sts3215"],
"wrist_flex": [4, "sts3215"],
"wrist_roll": [5, "sts3215"],
"gripper": [6, "sts3215"],
},
),
}
)
follower_arms: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {
"main": FeetechMotorsBusConfig(
port="/dev/tty.usbmodem585A0076891", <-- UPDATE HERE
motors={
# name: (index, model)
"shoulder_pan": [1, "sts3215"],
"shoulder_lift": [2, "sts3215"],
"elbow_flex": [3, "sts3215"],
"wrist_flex": [4, "sts3215"],
"wrist_roll": [5, "sts3215"],
"gripper": [6, "sts3215"],
},
),
}
)
```
**Configure your motors**
Plug your first motor and run this script to set its ID to 1. It will also set its present position to 2048, so expect your motor to rotate:
```bash
python lerobot/scripts/configure_motor.py \
--port /dev/tty.usbmodem58760432961 \
--brand feetech \
--model sts3215 \
--baudrate 1000000 \
--id 1
```
Note: These motors are currently limitated. They can take values between 0 and 4096 only, which corresponds to a full turn. They can't turn more than that. 2048 is at the middle of this range, so we can take -2048 steps (180 degrees anticlockwise) and reach the maximum range, or take +2048 steps (180 degrees clockwise) and reach the maximum range. The configuration step also sets the homing offset to 0, so that if you misassembled the arm, you can always update the homing offset to account for a shift up to ± 2048 steps (± 180 degrees).
Then unplug your motor and plug the second motor and set its ID to 2.
```bash
python lerobot/scripts/configure_motor.py \
--port /dev/tty.usbmodem58760432961 \
--brand feetech \
--model sts3215 \
--baudrate 1000000 \
--id 2
```
Redo the process for all your motors until ID 6. Do the same for the 6 motors of the leader arm.
**Remove the gears of the 6 leader motors**
Follow step 2 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic). You need to remove the gear for the motors of the leader arm. As a result, you will only use the position encoding of the motor and reduce friction to more easily operate the leader arm.
**Add motor horn to the motors**
Follow step 3 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic). For Moss v1, you need to align the holes on the motor horn to the motor spline to be approximately 3, 6, 9 and 12 o'clock.
Try to avoid rotating the motor while doing so to keep position 2048 set during configuration. It is especially tricky for the leader motors as it is more sensible without the gears, but it's ok if it's a bit rotated.
## Assemble the arms
Follow step 4 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic). The first arm should take a bit more than 1 hour to assemble, but once you get used to it, you can do it under 1 hour for the second arm.
## Calibrate
Next, you'll need to calibrate your Moss v1 robot to ensure that the leader and follower arms have the same position values when they are in the same physical position. This calibration is essential because it allows a neural network trained on one Moss v1 robot to work on another.
**Manual calibration of follower arm**
/!\ Contrarily to step 6 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic) which illustrates the auto calibration, we will actually do manual calibration of follower for now.
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| <img src="../media/moss/follower_zero.webp?raw=true" alt="Moss v1 follower arm zero position" title="Moss v1 follower arm zero position" style="width:100%;"> | <img src="../media/moss/follower_rotated.webp?raw=true" alt="Moss v1 follower arm rotated position" title="Moss v1 follower arm rotated position" style="width:100%;"> | <img src="../media/moss/follower_rest.webp?raw=true" alt="Moss v1 follower arm rest position" title="Moss v1 follower arm rest position" style="width:100%;"> |
Make sure both arms are connected and run this script to launch manual calibration:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=moss \
--robot.cameras='{}' \
--control.type=calibrate \
--control.arms='["main_follower"]'
```
**Manual calibration of leader arm**
Follow step 6 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| <img src="../media/moss/leader_zero.webp?raw=true" alt="Moss v1 leader arm zero position" title="Moss v1 leader arm zero position" style="width:100%;"> | <img src="../media/moss/leader_rotated.webp?raw=true" alt="Moss v1 leader arm rotated position" title="Moss v1 leader arm rotated position" style="width:100%;"> | <img src="../media/moss/leader_rest.webp?raw=true" alt="Moss v1 leader arm rest position" title="Moss v1 leader arm rest position" style="width:100%;"> |
Run this script to launch manual calibration:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=moss \
--robot.cameras='{}' \
--control.type=calibrate \
--control.arms='["main_leader"]'
```
## Teleoperate
**Simple teleop**
Then you are ready to teleoperate your robot! Run this simple script (it won't connect and display the cameras):
```bash
python lerobot/scripts/control_robot.py \
--robot.type=moss \
--robot.cameras='{}' \
--control.type=teleoperate
```
**Teleop with displaying cameras**
Follow [this guide to setup your cameras](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#c-add-your-cameras-with-opencvcamera). Then you will be able to display the cameras on your computer while you are teleoperating by running the following code. This is useful to prepare your setup before recording your first dataset.
> **NOTE:** To visualize the data, enable `--control.display_data=true`. This streams the data using `rerun`.
```bash
python lerobot/scripts/control_robot.py \
--robot.type=moss \
--control.type=teleoperate
```
## Record a dataset
Once you're familiar with teleoperation, you can record your first dataset with Moss v1.
If you want to use the Hugging Face hub features for uploading your dataset and you haven't previously done it, make sure you've logged in using a write-access token, which can be generated from the [Hugging Face settings](https://huggingface.co/settings/tokens):
```bash
huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
```
Store your Hugging Face repository name in a variable to run these commands:
```bash
HF_USER=$(huggingface-cli whoami | head -n 1)
echo $HF_USER
```
Record 2 episodes and upload your dataset to the hub:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=moss \
--control.type=record \
--control.fps=30 \
--control.single_task="Grasp a lego block and put it in the bin." \
--control.repo_id=${HF_USER}/moss_test \
--control.tags='["moss","tutorial"]' \
--control.warmup_time_s=5 \
--control.episode_time_s=30 \
--control.reset_time_s=30 \
--control.num_episodes=2 \
--control.push_to_hub=true
```
Note: You can resume recording by adding `--control.resume=true`.
## Visualize a dataset
If you uploaded your dataset to the hub with `--control.push_to_hub=true`, you can [visualize your dataset online](https://huggingface.co/spaces/lerobot/visualize_dataset) by copy pasting your repo id given by:
```bash
echo ${HF_USER}/moss_test
```
If you didn't upload with `--control.push_to_hub=false`, you can also visualize it locally with:
```bash
python lerobot/scripts/visualize_dataset_html.py \
--repo-id ${HF_USER}/moss_test \
--local-files-only 1
```
## Replay an episode
Now try to replay the first episode on your robot:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=moss \
--control.type=replay \
--control.fps=30 \
--control.repo_id=${HF_USER}/moss_test \
--control.episode=0
```
## Train a policy
To train a policy to control your robot, use the [`python lerobot/scripts/train.py`](../lerobot/scripts/train.py) script. A few arguments are required. Here is an example command:
```bash
python lerobot/scripts/train.py \
--dataset.repo_id=${HF_USER}/moss_test \
--policy.type=act \
--output_dir=outputs/train/act_moss_test \
--job_name=act_moss_test \
--policy.device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/moss_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor states, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_moss_test/checkpoints`.
## Evaluate your policy
You can use the `record` function from [`lerobot/scripts/control_robot.py`](../lerobot/scripts/control_robot.py) but with a policy checkpoint as input. For instance, run this command to record 10 evaluation episodes:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=moss \
--control.type=record \
--control.fps=30 \
--control.single_task="Grasp a lego block and put it in the bin." \
--control.repo_id=${HF_USER}/eval_act_moss_test \
--control.tags='["tutorial"]' \
--control.warmup_time_s=5 \
--control.episode_time_s=30 \
--control.reset_time_s=30 \
--control.num_episodes=10 \
--control.push_to_hub=true \
--control.policy.path=outputs/train/act_moss_test/checkpoints/last/pretrained_model
```
As you can see, it's almost the same command as previously used to record your training dataset. Two things changed:
1. There is an additional `--control.policy.path` argument which indicates the path to your policy checkpoint with (e.g. `outputs/train/eval_act_moss_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `${HF_USER}/act_moss_test`).
2. The name of dataset begins by `eval` to reflect that you are running inference (e.g. `${HF_USER}/eval_act_moss_test`).
## More
Follow this [previous tutorial](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#4-train-a-policy-on-your-data) for a more in-depth tutorial on controlling real robots with LeRobot.
If you have any question or need help, please reach out on Discord in the channel [`#moss-arm`](https://discord.com/channels/1216765309076115607/1275374638985252925).

View File

@@ -1,2 +0,0 @@
from .configuration_moss import MossRobotConfig
from .moss_follower import MossRobot

View File

@@ -1,30 +0,0 @@
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("moss")
@dataclass
class MossRobotConfig(RobotConfig):
# Port to connect to the robot
port: str
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
mock: bool = False
# motors
shoulder_pan: tuple = (1, "sts3215")
shoulder_lift: tuple = (2, "sts3215")
elbow_flex: tuple = (3, "sts3215")
wrist_flex: tuple = (4, "sts3215")
wrist_roll: tuple = (5, "sts3215")
gripper: tuple = (6, "sts3215")
# cameras
cameras: dict[str, CameraConfig] = field(default_factory=dict)

View File

@@ -1,215 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_moss import MossRobotConfig
logger = logging.getLogger(__name__)
class MossRobot(Robot):
"""
[Moss Arm](https://github.com/jess-moss/moss-robot-arms) designed by Jess Moss
"""
config_class = MossRobotConfig
name = "moss_follower"
def __init__(self, config: MossRobotConfig):
super().__init__(config)
self.config = config
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor]
print(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.arm.torque_disabled():
self.arm.configure_motors()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.arm.write("P_Coefficient", motor, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", motor, 0)
self.arm.write("D_Coefficient", motor, 32)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Read arm position
start = time.perf_counter()
obs_dict = self.arm.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from pathlib import Path
from typing import Any

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig

View File

@@ -45,7 +45,7 @@ class SO100Follower(Robot):
def __init__(self, config: SO100FollowerConfig):
super().__init__(config)
self.config = config
self.arm = FeetechMotorsBus(
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
@@ -61,7 +61,7 @@ class SO100Follower(Robot):
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
@@ -80,7 +80,7 @@ class SO100Follower(Robot):
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
@@ -90,7 +90,7 @@ class SO100Follower(Robot):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -103,29 +103,29 @@ class SO100Follower(Robot):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor]
unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor]
print(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
@@ -134,26 +134,26 @@ class SO100Follower(Robot):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.arm.torque_disabled():
self.arm.configure_motors()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
with self.bus.torque_disabled():
self.bus.configure_motors()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.arm.write("P_Coefficient", motor, 16)
self.bus.write("P_Coefficient", motor, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", motor, 0)
self.arm.write("D_Coefficient", motor, 32)
self.bus.write("I_Coefficient", motor, 0)
self.bus.write("D_Coefficient", motor, 32)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
@@ -161,7 +161,7 @@ class SO100Follower(Robot):
# Read arm position
start = time.perf_counter()
obs_dict = self.arm.sync_read("Present_Position")
obs_dict = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -196,19 +196,19 @@ class SO100Follower(Robot):
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.sync_read("Present_Position")
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect(self.config.disable_torque_on_disconnect)
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()

View File

@@ -45,7 +45,7 @@ class SO101Follower(Robot):
def __init__(self, config: SO101FollowerConfig):
super().__init__(config)
self.config = config
self.arm = FeetechMotorsBus(
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
@@ -61,7 +61,7 @@ class SO101Follower(Robot):
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
@@ -80,7 +80,7 @@ class SO101Follower(Robot):
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
@@ -90,7 +90,7 @@ class SO101Follower(Robot):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -103,25 +103,25 @@ class SO101Follower(Robot):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
print(
"Move all joints sequentially through their entire ranges "
"of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion()
range_mins, range_maxes = self.bus.record_ranges_of_motion()
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
@@ -130,26 +130,26 @@ class SO101Follower(Robot):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.arm.torque_disabled():
self.arm.configure_motors()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
with self.bus.torque_disabled():
self.bus.configure_motors()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.arm.write("P_Coefficient", motor, 16)
self.bus.write("P_Coefficient", motor, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", motor, 0)
self.arm.write("D_Coefficient", motor, 32)
self.bus.write("I_Coefficient", motor, 0)
self.bus.write("D_Coefficient", motor, 32)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
@@ -157,7 +157,7 @@ class SO101Follower(Robot):
# Read arm position
start = time.perf_counter()
obs_dict = self.arm.sync_read("Present_Position")
obs_dict = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -192,19 +192,19 @@ class SO101Follower(Robot):
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.sync_read("Present_Position")
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect(self.config.disable_torque_on_disconnect)
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pprint import pformat
@@ -14,12 +28,6 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
from .koch_follower.config_koch_follower import KochFollowerConfig
return KochFollowerConfig(**kwargs)
# elif robot_type == "koch_bimanual":
# return KochBimanualRobotConfig(**kwargs)
elif robot_type == "moss_follower":
from .moss_follower.configuration_moss import MossRobotConfig
return MossRobotConfig(**kwargs)
elif robot_type == "so100_follower":
from .so100_follower.config_so100_follower import SO100FollowerConfig

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig

View File

@@ -1,8 +1,16 @@
"""Contains logic to instantiate a robot, read information from its motors and cameras,
and send orders to its motors.
"""
# TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated
# calibration procedure, to make it easy for people to add their own robot.
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@@ -39,7 +47,7 @@ class ViperX(Robot):
):
super().__init__(config)
self.config = config
self.arm = DynamixelMotorsBus(
self.bus = DynamixelMotorsBus(
port=self.config.port,
motors={
"waist": Motor(1, "xm540-w270", MotorNormMode.RANGE_M100_100),
@@ -57,7 +65,7 @@ class ViperX(Robot):
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
@@ -76,7 +84,7 @@ class ViperX(Robot):
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
@@ -86,7 +94,7 @@ class ViperX(Robot):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -98,31 +106,31 @@ class ViperX(Robot):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motors = ["shoulder_pan", "wrist_roll"]
unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors]
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
print(
f"Move all joints except {full_turn_motors} sequentially through their entire "
"ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for motor in full_turn_motors:
range_mins[motor] = 0
range_maxes[motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
@@ -131,36 +139,36 @@ class ViperX(Robot):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
with self.arm.torque_disabled():
self.arm.configure_motors()
with self.bus.torque_disabled():
self.bus.configure_motors()
# Set secondary/shadow ID for shoulder and elbow. These joints have two motors.
# As a result, if only one of them is required to move to a certain position,
# the other will follow. This is to avoid breaking the motors.
self.arm.write("Secondary_ID", "shoulder_shadow", 2)
self.arm.write("Secondary_ID", "elbow_shadow", 4)
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
self.bus.write("Secondary_ID", "elbow_shadow", 4)
# Set a velocity limit of 131 as advised by Trossen Robotics
# TODO(aliberts): remove as it's actually useless in position control
self.arm.write("Velocity_Limit", 131)
self.bus.write("Velocity_Limit", 131)
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling
# the arm, you could end up with a servo with a position 0 or 4095 at a crucial point.
# See: https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11
for motor in self.arm.motors:
for motor in self.bus.motors:
if motor != "gripper":
self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
# Use 'position control current based' for follower gripper to be limited by the limit of the
# current. It can grasp an object without forcing too much even tho, it's goal position is a
# complete grasp (both gripper fingers are ordered to join and reach a touch).
self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
def get_observation(self) -> dict[str, Any]:
"""The returned observations do not have a batch dimension."""
@@ -171,7 +179,7 @@ class ViperX(Robot):
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
obs_dict[OBS_STATE] = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -206,19 +214,19 @@ class ViperX(Robot):
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.sync_read("Present_Position")
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect(self.config.disable_torque_on_disconnect)
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from dataclasses import dataclass
from pathlib import Path

View File

@@ -44,7 +44,7 @@ class KochLeader(Teleoperator):
def __init__(self, config: KochLeaderConfig):
super().__init__(config)
self.config = config
self.arm = DynamixelMotorsBus(
self.bus = DynamixelMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "xl330-m077", MotorNormMode.RANGE_M100_100),
@@ -59,7 +59,7 @@ class KochLeader(Teleoperator):
@property
def action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
@@ -67,13 +67,13 @@ class KochLeader(Teleoperator):
@property
def is_connected(self) -> bool:
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -82,33 +82,33 @@ class KochLeader(Teleoperator):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.arm.motors}
self.bus.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.bus.motors}
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motors = ["shoulder_pan", "wrist_roll"]
unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors]
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
print(
f"Move all joints except {full_turn_motors} sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for motor in full_turn_motors:
range_mins[motor] = 0
range_maxes[motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=drive_modes[motor],
@@ -117,43 +117,43 @@ class KochLeader(Teleoperator):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
self.arm.disable_torque()
self.arm.configure_motors()
for motor in self.arm.motors:
self.bus.disable_torque()
self.bus.configure_motors()
for motor in self.bus.motors:
if motor != "gripper":
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while
# assembling the arm, you could end up with a servo with a position 0 or 4095 at a crucial
# point
self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# its goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
# Set gripper's goal pos in current position mode so that we can use it as a trigger.
self.arm.enable_torque("gripper")
self.arm.write("Goal_Position", "gripper", self.config.gripper_open_pos)
self.bus.enable_torque("gripper")
self.bus.write("Goal_Position", "gripper", self.config.gripper_open_pos)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start = time.perf_counter()
action = self.arm.sync_read("Present_Position")
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
@@ -167,5 +167,5 @@ class KochLeader(Teleoperator):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect()
self.bus.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -41,7 +41,7 @@ class SO100Leader(Teleoperator):
def __init__(self, config: SO100LeaderConfig):
super().__init__(config)
self.config = config
self.arm = FeetechMotorsBus(
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
@@ -56,7 +56,7 @@ class SO100Leader(Teleoperator):
@property
def action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
@@ -64,13 +64,13 @@ class SO100Leader(Teleoperator):
@property
def is_connected(self) -> bool:
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -79,29 +79,29 @@ class SO100Leader(Teleoperator):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor]
unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor]
print(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
@@ -110,25 +110,25 @@ class SO100Leader(Teleoperator):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
self.arm.disable_torque()
self.arm.configure_motors()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
self.bus.disable_torque()
self.bus.configure_motors()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.arm.sync_read("Present_Position")
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
@@ -142,5 +142,5 @@ class SO100Leader(Teleoperator):
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect()
self.bus.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -41,7 +41,7 @@ class SO101Leader(Teleoperator):
def __init__(self, config: SO101LeaderConfig):
super().__init__(config)
self.config = config
self.arm = FeetechMotorsBus(
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
@@ -56,7 +56,7 @@ class SO101Leader(Teleoperator):
@property
def action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
@@ -64,13 +64,13 @@ class SO101Leader(Teleoperator):
@property
def is_connected(self) -> bool:
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -79,25 +79,25 @@ class SO101Leader(Teleoperator):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
print(
"Move all joints sequentially through their entire ranges "
"of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion()
range_mins, range_maxes = self.bus.record_ranges_of_motion()
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
@@ -106,25 +106,25 @@ class SO101Leader(Teleoperator):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
self.arm.disable_torque()
self.arm.configure_motors()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
self.bus.disable_torque()
self.bus.configure_motors()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
def setup_motors(self) -> None:
for motor in reversed(self.arm.motors):
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.arm.setup_motor(motor)
print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.arm.sync_read("Present_Position")
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
@@ -138,5 +138,5 @@ class SO101Leader(Teleoperator):
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect()
self.bus.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from pathlib import Path
from typing import Any

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config import TeleoperatorConfig
from .teleoperator import Teleoperator

View File

@@ -42,7 +42,7 @@ class WidowX(Teleoperator):
def __init__(self, config: WidowXConfig):
super().__init__(config)
self.config = config
self.arm = DynamixelMotorsBus(
self.bus = DynamixelMotorsBus(
port=self.config.port,
motors={
"waist": Motor(1, "xm430-w350", MotorNormMode.RANGE_M100_100),
@@ -59,7 +59,7 @@ class WidowX(Teleoperator):
@property
def action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.arm.motors}
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
@@ -67,13 +67,13 @@ class WidowX(Teleoperator):
@property
def is_connected(self) -> bool:
return self.arm.is_connected
return self.bus.is_connected
def connect(self, calibrate: bool = True):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
@@ -82,34 +82,34 @@ class WidowX(Teleoperator):
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
return self.bus.is_calibrated
def calibrate(self) -> None:
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch)
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for motor in self.arm.motors:
self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.arm.motors}
self.bus.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.bus.motors}
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motors = ["shoulder_pan", "wrist_roll"]
unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors]
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
print(
f"Move all joints except {full_turn_motors} sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for motor in full_turn_motors:
range_mins[motor] = 0
range_maxes[motor] = 4095
self.calibration = {}
for motor, m in self.arm.motors.items():
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=drive_modes[motor],
@@ -118,26 +118,26 @@ class WidowX(Teleoperator):
range_max=range_maxes[motor],
)
self.arm.write_calibration(self.calibration)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
self.arm.disable_torque()
self.arm.configure_motors()
self.bus.disable_torque()
self.bus.configure_motors()
# Set secondary/shadow ID for shoulder and elbow. These joints have two motors.
# As a result, if only one of them is required to move to a certain position,
# the other will follow. This is to avoid breaking the motors.
self.arm.write("Secondary_ID", "shoulder_shadow", 2)
self.arm.write("Secondary_ID", "elbow_shadow", 4)
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
self.bus.write("Secondary_ID", "elbow_shadow", 4)
def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start = time.perf_counter()
action = self.arm.sync_read("Present_Position")
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
@@ -150,5 +150,5 @@ class WidowX(Teleoperator):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect()
self.bus.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -1,3 +1,18 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def encode_sign_magnitude(value: int, sign_bit_index: int):
"""
https://en.wikipedia.org/wiki/Signed_number_representations#Sign%E2%80%93magnitude

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import rerun as rr

View File

@@ -19,8 +19,8 @@ Example:
```shell
python -m lerobot.setup_motors \
--device.type=so100_leader \
--device.port=/dev/tty.usbmodem575E0031751
--teleop.type=so100_leader \
--teleop.port=/dev/tty.usbmodem575E0031751
```
"""
@@ -46,7 +46,14 @@ COMPATIBLE_DEVICES = [
@dataclass
class SetupConfig:
device: RobotConfig | TeleoperatorConfig
teleop: TeleoperatorConfig | None = None
robot: RobotConfig | None = None
def __post_init__(self):
if bool(self.teleop) == bool(self.robot):
raise ValueError("Choose either a teleop or a robot.")
self.device = self.robot if self.robot else self.teleop
@draccus.wrap()

View File

@@ -82,16 +82,15 @@ dev = ["pre-commit>=3.7.0", "debugpy>=1.8.1"]
dora = [
"gym-dora @ git+https://github.com/dora-rs/dora-lerobot.git#subdirectory=gym_dora ; python_version < '4.0'",
]
dynamixel = ["dynamixel-sdk>=3.7.31", "pynput>=1.7.7"]
feetech = ["feetech-servo-sdk>=1.0.0", "pynput>=1.7.7"]
dynamixel = ["dynamixel-sdk>=3.7.31"]
feetech = ["feetech-servo-sdk>=1.0.0"]
intelrealsense = ["pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'"]
pi0 = ["transformers>=4.48.0"]
pusht = ["gym-pusht>=0.1.5 ; python_version < '4.0'"]
stretch = [
"hello-robot-stretch-body>=0.7.27 ; python_version < '4.0' and sys_platform == 'linux'",
"pyrender @ git+https://github.com/mmatl/pyrender.git ; sys_platform == 'linux'",
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
"pynput>=1.7.7",
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'"
]
test = ["pytest>=8.1.0", "pytest-cov>=5.0.0", "pyserial>=3.5", "mock-serial>=0.0.1 ; sys_platform != 'win32'"]
umi = ["imagecodecs>=2024.1.1"]

View File

@@ -11,7 +11,7 @@ from tests.mocks.mock_teleop import MockTeleopConfig
def test_calibrate():
robot_cfg = MockRobotConfig()
cfg = CalibrateConfig(device=robot_cfg)
cfg = CalibrateConfig(robot=robot_cfg)
calibrate(cfg)