Control aloha robot natively (#316)

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
This commit is contained in:
Remi
2024-09-04 19:28:05 +02:00
committed by GitHub
parent 27ba2951d1
commit 429a463aff
32 changed files with 898 additions and 390 deletions

View File

@@ -17,9 +17,12 @@ import cv2
import numpy as np
from PIL import Image
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.utils import (
RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError,
busy_wait,
)
from lerobot.common.utils.utils import capture_timestamp_utc
from lerobot.scripts.control_robot import busy_wait
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.

View File

@@ -1,4 +1,6 @@
import enum
import logging
import math
import time
import traceback
from copy import deepcopy
@@ -27,11 +29,28 @@ TIMEOUT_MS = 1000
MAX_ID_RANGE = 252
# The following bounds define the lower and upper joints range (after calibration).
# For joints in degree (i.e. revolute joints), their nominal range is [-180, 180] degrees
# which corresponds to a half rotation on the left and half rotation on the right.
# Some joints might require higher range, so we allow up to [-270, 270] degrees until
# an error is raised.
LOWER_BOUND_DEGREE = -270
UPPER_BOUND_DEGREE = 270
# For joints in percentage (i.e. joints that move linearly like the prismatic joint of a gripper),
# their nominal range is [0, 100] %. For instance, for Aloha gripper, 0% is fully
# closed, and 100% is fully open. To account for slight calibration issue, we allow up to
# [-10, 110] until an error is raised.
LOWER_BOUND_LINEAR = -10
UPPER_BOUND_LINEAR = 110
HALF_TURN_DEGREE = 180
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m077
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m288
# https://emanual.robotis.com/docs/en/dxl/x/xl430-w250
# https://emanual.robotis.com/docs/en/dxl/x/xm430-w350
# https://emanual.robotis.com/docs/en/dxl/x/xm540-w270
# https://emanual.robotis.com/docs/en/dxl/x/xc430-w150
# data_name: (address, size_byte)
X_SERIES_CONTROL_TABLE = {
@@ -109,6 +128,7 @@ MODEL_CONTROL_TABLE = {
"xl430-w250": X_SERIES_CONTROL_TABLE,
"xm430-w350": X_SERIES_CONTROL_TABLE,
"xm540-w270": X_SERIES_CONTROL_TABLE,
"xc430-w150": X_SERIES_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
@@ -118,6 +138,7 @@ MODEL_RESOLUTION = {
"xl430-w250": 4096,
"xm430-w350": 4096,
"xm540-w270": 4096,
"xc430-w150": 4096,
}
MODEL_BAUDRATE_TABLE = {
@@ -127,20 +148,18 @@ MODEL_BAUDRATE_TABLE = {
"xl430-w250": X_SERIES_BAUDRATE_TABLE,
"xm430-w350": X_SERIES_BAUDRATE_TABLE,
"xm540-w270": X_SERIES_BAUDRATE_TABLE,
"xc430-w150": X_SERIES_BAUDRATE_TABLE,
}
NUM_READ_RETRY = 10
NUM_WRITE_RETRY = 10
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]):
"""This function convert the degree range to the step range for indicating motors rotation.
It assums a motor achieves a full rotation by going from -180 degree position to +180.
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray:
"""This function converts the degree range to the step range for indicating motors rotation.
It assumes a motor achieves a full rotation by going from -180 degree position to +180.
The motor resolution (e.g. 4096) corresponds to the number of steps needed to achieve a full rotation.
"""
if isinstance(degrees, float):
degrees = np.array(degrees)
resolutions = [MODEL_RESOLUTION[model] for model in models]
steps = degrees / 180 * np.array(resolutions) / 2
steps = steps.astype(int)
@@ -250,20 +269,24 @@ class TorqueMode(enum.Enum):
DISABLED = 0
class OperatingMode(enum.Enum):
VELOCITY = 1
POSITION = 3
EXTENDED_POSITION = 4
CURRENT_CONTROLLED_POSITION = 5
PWM = 16
UNKNOWN = -1
class DriveMode(enum.Enum):
NON_INVERTED = 0
INVERTED = 1
class CalibrationMode(enum.Enum):
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
DEGREE = 0
# Joints with linear motions (like gripper of Aloha) are experessed in nominal range of [0, 100]
LINEAR = 1
class JointOutOfRangeError(Exception):
def __init__(self, message="Joint is out of range"):
self.message = message
super().__init__(self.message)
class DynamixelMotorsBus:
# TODO(rcadene): Add a script to find the motor indices without DynamixelWizzard2
"""
@@ -531,9 +554,22 @@ class DynamixelMotorsBus:
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def set_calibration(self, calibration: dict[str, tuple[int, bool]]):
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration_autocorrect(self, values: np.ndarray | list, motor_names: list[str] | None):
"""This function applies the calibration, automatically detects out of range errors for motors values and attempts to correct.
For more info, see docstring of `apply_calibration` and `autocorrect_calibration`.
"""
try:
values = self.apply_calibration(values, motor_names)
except JointOutOfRangeError as e:
print(e)
self.autocorrect_calibration(values, motor_names)
values = self.apply_calibration(values, motor_names)
return values
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
a "zero position" at 0 degree.
@@ -551,53 +587,197 @@ class DynamixelMotorsBus:
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32[ to centered signed int32 range [-2**31, 2**31[
values = values.astype(np.int32)
for i, name in enumerate(motor_names):
homing_offset, drive_mode = self.calibration[name]
# Update direction of rotation of the motor to match between leader and follower. In fact, the motor of the leader for a given joint
# can be assembled in an opposite direction in term of rotation than the motor of the follower on the same joint.
if drive_mode:
values[i] *= -1
# Convert from range [-2**31, 2**31[ to nominal range ]-resolution, resolution[ (e.g. ]-2048, 2048[)
values[i] += homing_offset
# Convert from range ]-resolution, resolution[ to the universal float32 centered degree range ]-180, 180[
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
_, model = self.motors[name]
resolution = self.model_resolution[model]
values[i] = values[i] / (resolution // 2) * 180
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Update direction of rotation of the motor to match between leader and follower.
# In fact, the motor of the leader for a given joint can be assembled in an
# opposite direction in term of rotation than the motor of the follower on the same joint.
if drive_mode:
values[i] *= -1
# Convert from range [-2**31, 2**31] to
# nominal range [-resolution//2, resolution//2] (e.g. [-2048, 2048])
values[i] += homing_offset
# Convert from range [-resolution//2, resolution//2] to
# universal float32 centered degree range [-180, 180]
# (e.g. 2048 / (4096 // 2) * 180 = 180)
values[i] = values[i] / (resolution // 2) * HALF_TURN_DEGREE
if (values[i] < LOWER_BOUND_DEGREE) or (values[i] > UPPER_BOUND_DEGREE):
raise JointOutOfRangeError(
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [-{HALF_TURN_DEGREE}, {HALF_TURN_DEGREE}] degrees (a full rotation), "
f"with a maximum range of [{LOWER_BOUND_DEGREE}, {UPPER_BOUND_DEGREE}] degrees to account for joints that can rotate a bit more, "
f"but present value is {values[i]} degree. "
"This might be due to a cable connection issue creating an artificial 360 degrees jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to a nominal range [0, 100] %,
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
raise JointOutOfRangeError(
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
return values
def autocorrect_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""This function automatically detects issues with values of motors after calibration, and correct for these issues.
Some motors might have values outside of expected maximum bounds after calibration.
For instance, for a joint in degree, its value can be outside [-270, 270] degrees, which is totally unexpected given
a nominal range of [-180, 180] degrees, which represents half a turn to the left or right starting from zero position.
Known issues:
#1: Motor value randomly shifts of a full turn, caused by hardware/connection errors.
#2: Motor internal homing offset is shifted by a full turn, caused by using default calibration (e.g Aloha).
#3: motor internal homing offset is shifted by less or more than a full turn, caused by using default calibration
or by human error during manual calibration.
Issues #1 and #2 can be solved by shifting the calibration homing offset by a full turn.
Issue #3 will be visually detected by user and potentially captured by the safety feature `max_relative_target`,
that will slow down the motor, raise an error asking to recalibrate. Manual recalibrating will solve the issue.
Note: A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
"""
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Update direction of rotation of the motor to match between leader and follower.
# In fact, the motor of the leader for a given joint can be assembled in an
# opposite direction in term of rotation than the motor of the follower on the same joint.
if drive_mode:
values[i] *= -1
# Convert from initial range to range [-180, 180] degrees
calib_val = (values[i] + homing_offset) / (resolution // 2) * HALF_TURN_DEGREE
in_range = (calib_val > LOWER_BOUND_DEGREE) and (calib_val < UPPER_BOUND_DEGREE)
# Solve this inequality to find the factor to shift the range into [-180, 180] degrees
# values[i] = (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE
# - HALF_TURN_DEGREE <= (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE <= HALF_TURN_DEGREE
# (- (resolution // 2) - values[i] - homing_offset) / resolution <= factor <= ((resolution // 2) - values[i] - homing_offset) / resolution
low_factor = (-(resolution // 2) - values[i] - homing_offset) / resolution
upp_factor = ((resolution // 2) - values[i] - homing_offset) / resolution
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Convert from initial range to range [0, 100] in %
calib_val = (values[i] - start_pos) / (end_pos - start_pos) * 100
in_range = (calib_val > LOWER_BOUND_LINEAR) and (calib_val < UPPER_BOUND_LINEAR)
# Solve this inequality to find the factor to shift the range into [0, 100] %
# values[i] = (values[i] - start_pos + resolution * factor) / (end_pos + resolution * factor - start_pos - resolution * factor) * 100
# values[i] = (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100
# 0 <= (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100 <= 100
# (start_pos - values[i]) / resolution <= factor <= (end_pos - values[i]) / resolution
low_factor = (start_pos - values[i]) / resolution
upp_factor = (end_pos - values[i]) / resolution
if not in_range:
# Get first integer between the two bounds
if low_factor < upp_factor:
factor = math.ceil(low_factor)
if factor > upp_factor:
raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]")
else:
factor = math.ceil(upp_factor)
if factor > low_factor:
raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]")
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
out_of_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees"
in_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees"
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
out_of_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
in_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
logging.warning(
f"Auto-correct calibration of motor '{name}' by shifting value by {abs(factor)} full turns, "
f"from '{out_of_range_str}' to '{in_range_str}'."
)
# A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
self.calibration["homing_offset"][calib_idx] += resolution * factor
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Inverse of `apply_calibration`."""
if motor_names is None:
motor_names = self.motor_names
# Convert from the universal float32 centered degree range ]-180, 180[ to resolution range ]-resolution, resolution[
for i, name in enumerate(motor_names):
_, model = self.motors[name]
resolution = self.model_resolution[model]
values[i] = values[i] / 180 * (resolution // 2)
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Convert from nominal 0-centered degree range [-180, 180] to
# 0-centered resolution range (e.g. [-2048, 2048] for resolution=4096)
values[i] = values[i] / HALF_TURN_DEGREE * (resolution // 2)
# Substract the homing offsets to come back to actual motor range of values
# which can be arbitrary.
values[i] -= homing_offset
# Remove drive mode, which is the rotation direction of the motor, to come back to
# actual motor rotation direction which can be arbitrary.
if drive_mode:
values[i] *= -1
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Convert from nominal lnear range of [0, 100] % to
# actual motor range of values which can be arbitrary.
values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
values = np.round(values).astype(np.int32)
# Convert from nominal range ]-resolution, resolution[ to centered signed int32 range [-2**31, 2**31[
for i, name in enumerate(motor_names):
homing_offset, drive_mode = self.calibration[name]
values[i] -= homing_offset
# Update direction of rotation of the motor that was matching between leader and follower to their original direction.
# In fact, the motor of the leader for a given joint can be assembled in an opposite direction in term of rotation
# than the motor of the follower on the same joint.
if drive_mode:
values[i] *= -1
return values
def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
@@ -683,19 +863,7 @@ class DynamixelMotorsBus:
values = values.astype(np.int32)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.apply_calibration(values, motor_names)
# We expect our motors to stay in a nominal range of [-180, 180] degrees
# which corresponds to a half turn rotation.
# However, some motors can turn a bit more, hence we extend the nominal range to [-270, 270]
# which is less than a full 360 degree rotation.
if not np.all((values > -270) & (values < 270)):
raise ValueError(
f"Wrong motor position range detected. "
f"Expected to be in [-270, +270] but in [{values.min()}, {values.max()}]. "
"This might be due to a cable connection issue creating an artificial 360 degrees jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
values = self.apply_calibration_autocorrect(values, motor_names)
# log the number of seconds it took to read the data from the motors
delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_names)

View File

@@ -1,6 +1,7 @@
import json
import logging
import pickle
import time
import warnings
from dataclasses import dataclass, field, replace
from pathlib import Path
from typing import Sequence
@@ -10,11 +11,12 @@ import torch
from lerobot.common.robot_devices.cameras.utils import Camera
from lerobot.common.robot_devices.motors.dynamixel import (
OperatingMode,
CalibrationMode,
TorqueMode,
convert_degrees_to_steps,
)
from lerobot.common.robot_devices.motors.utils import MotorsBus
from lerobot.common.robot_devices.robots.utils import get_arm_id
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
########################################################################
@@ -25,7 +27,8 @@ URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
)
# In nominal degree range ]-180, +180[
# The following positions are provided in nominal degree range ]-180, +180[
# For more info on these constants, see comments in the code where they get used.
ZERO_POSITION_DEGREE = 0
ROTATED_POSITION_DEGREE = 90
@@ -45,27 +48,13 @@ def apply_drive_mode(position, drive_mode):
return position
def reset_torque_mode(arm: MotorsBus):
# To be configured, all servos must be in "torque disable" mode
arm.write("Torque_Enable", TorqueMode.DISABLED.value)
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
if len(all_motors_except_gripper) > 0:
arm.write("Operating_Mode", OperatingMode.EXTENDED_POSITION.value, all_motors_except_gripper)
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
arm.write("Operating_Mode", OperatingMode.CURRENT_CONTROLLED_POSITION.value, "gripper")
def compute_nearest_rounded_position(position, models):
delta_turn = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, models)
nearest_pos = np.round(position.astype(float) / delta_turn) * delta_turn
return nearest_pos.astype(position.dtype)
def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""This function ensures that a neural network trained on data collected on a given robot
can work on another robot. For instance before calibration, setting a same goal position
for each motor of two different robots will get two very different positions. But after calibration,
@@ -84,38 +73,27 @@ def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
Example of usage:
```python
run_arm_calibration(arm, "left", "follower")
run_arm_calibration(arm, "koch", "left", "follower")
```
"""
reset_torque_mode(arm)
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run calibration, the torque must be disabled on all motors.")
print(f"\nRunning calibration of {name} {arm_type}...")
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
print("\nMove arm to zero position")
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="zero"))
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero"))
input("Press Enter to continue...")
# We arbitrarely choosed our zero target position to be a straight horizontal position with gripper upwards and closed.
# We arbitrarily chose our zero target position to be a straight horizontal position with gripper upwards and closed.
# It is easy to identify and all motors are in a "quarter turn" position. Once calibration is done, this position will
# corresponds to every motor angle being 0. If you set all 0 as Goal Position, the arm will move in this position.
zero_position = convert_degrees_to_steps(ZERO_POSITION_DEGREE, arm.motor_models)
def _compute_nearest_rounded_position(position, models):
# TODO(rcadene): Rework this function since some motors cant physically rotate a quarter turn
# (e.g. the gripper of Aloha arms can only rotate ~50 degree)
quarter_turn_degree = 90
quarter_turn = convert_degrees_to_steps(quarter_turn_degree, models)
nearest_pos = np.round(position.astype(float) / quarter_turn) * quarter_turn
return nearest_pos.astype(position.dtype)
# correspond to every motor angle being 0. If you set all 0 as Goal Position, the arm will move in this position.
zero_target_pos = convert_degrees_to_steps(ZERO_POSITION_DEGREE, arm.motor_models)
# Compute homing offset so that `present_position + homing_offset ~= target_position`.
position = arm.read("Present_Position")
position = _compute_nearest_rounded_position(position, arm.motor_models)
homing_offset = zero_position - position
print("\nMove arm to rotated target position")
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rotated"))
input("Press Enter to continue...")
zero_pos = arm.read("Present_Position")
zero_nearest_pos = compute_nearest_rounded_position(zero_pos, arm.motor_models)
homing_offset = zero_target_pos - zero_nearest_pos
# The rotated target position corresponds to a rotation of a quarter turn from the zero position.
# This allows to identify the rotation direction of each motor.
@@ -124,44 +102,83 @@ def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
# Sometimes, there is only one possible rotation direction. For instance, if the gripper is closed, there is only one direction which
# corresponds to opening the gripper. When the rotation direction is ambiguous, we arbitrarely rotate clockwise from the point of view
# of the previous motor in the kinetic chain.
rotated_position = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models)
print("\nMove arm to rotated target position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rotated"))
input("Press Enter to continue...")
rotated_target_pos = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models)
# Find drive mode by rotating each motor by a quarter of a turn.
# Drive mode indicates if the motor rotation direction should be inverted (=1) or not (=0).
position = arm.read("Present_Position")
position += homing_offset
position = _compute_nearest_rounded_position(position, arm.motor_models)
drive_mode = (position != rotated_position).astype(np.int32)
rotated_pos = arm.read("Present_Position")
drive_mode = (rotated_pos < zero_pos).astype(np.int32)
# Re-compute homing offset to take into account drive mode
position = arm.read("Present_Position")
position = apply_drive_mode(position, drive_mode)
position = _compute_nearest_rounded_position(position, arm.motor_models)
homing_offset = rotated_position - position
rotated_drived_pos = apply_drive_mode(rotated_pos, drive_mode)
rotated_nearest_pos = compute_nearest_rounded_position(rotated_drived_pos, arm.motor_models)
homing_offset = rotated_target_pos - rotated_nearest_pos
print("\nMove arm to rest position")
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rest"))
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest"))
input("Press Enter to continue...")
print()
return homing_offset, drive_mode
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
calib_mode = [CalibrationMode.DEGREE.name] * len(arm.motor_names)
# TODO(rcadene): make type of joints (DEGREE or LINEAR) configurable from yaml?
if robot_type == "aloha" and "gripper" in arm.motor_names:
# Joints with linear motions (like gripper of Aloha) are experessed in nominal range of [0, 100]
calib_idx = arm.motor_names.index("gripper")
calib_mode[calib_idx] = CalibrationMode.LINEAR.name
calib_data = {
"homing_offset": homing_offset.tolist(),
"drive_mode": drive_mode.tolist(),
"start_pos": zero_pos.tolist(),
"end_pos": rotated_pos.tolist(),
"calib_mode": calib_mode,
"motor_names": arm.motor_names,
}
return calib_data
def ensure_safe_goal_position(
goal_pos: torch.Tensor, present_pos: torch.Tensor, max_relative_target: float | list[float]
):
# Cap relative action target magnitude for safety.
diff = goal_pos - present_pos
max_relative_target = torch.tensor(max_relative_target)
safe_diff = torch.minimum(diff, max_relative_target)
safe_diff = torch.maximum(safe_diff, -max_relative_target)
safe_goal_pos = present_pos + safe_diff
if not torch.allclose(goal_pos, safe_goal_pos):
logging.warning(
"Relative goal position magnitude had to be clamped to be safe.\n"
f" requested relative goal position target: {diff}\n"
f" clamped relative goal position target: {safe_diff}"
)
return safe_goal_pos
########################################################################
# Alexander Koch robot arm
# Manipulator robot
########################################################################
@dataclass
class KochRobotConfig:
class ManipulatorRobotConfig:
"""
Example of usage:
```python
KochRobotConfig()
ManipulatorRobotConfig()
```
"""
# Define all components of the robot
robot_type: str | None = None
leader_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
follower_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
cameras: dict[str, Camera] = field(default_factory=lambda: {})
@@ -191,14 +208,15 @@ class KochRobotConfig:
super().__setattr__(prop, val)
class KochRobot:
class ManipulatorRobot:
# TODO(rcadene): Implement force feedback
"""This class allows to control any Koch robot of various number of motors.
"""This class allows to control any manipulator robot of various number of motors.
A few versions are available:
- [Koch v1.0](https://github.com/AlexanderKoch-Koch/low_cost_robot), with and without the wrist-to-elbow expansion, which was developed
by Alexander Koch from [Tau Robotics](https://tau-robotics.com): [Github for sourcing and assembly](
- [Koch v1.1])https://github.com/jess-moss/koch-v1-1), which was developed by Jess Moss.
Non exaustive list of robots:
- [Koch v1.0](https://github.com/AlexanderKoch-Koch/low_cost_robot), with and without the wrist-to-elbow expansion, developed
by Alexander Koch from [Tau Robotics](https://tau-robotics.com)
- [Koch v1.1](https://github.com/jess-moss/koch-v1-1) developed by Jess Moss
- [Aloha](https://www.trossenrobotics.com/aloha-kits) developed by Trossen Robotics
Example of highest frequency teleoperation without camera:
```python
@@ -231,7 +249,9 @@ class KochRobot:
},
),
}
robot = KochRobot(
robot = ManipulatorRobot(
robot_type="koch",
calibration_dir=".cache/calibration/koch",
leader_arms=leader_arms,
follower_arms=follower_arms,
)
@@ -246,7 +266,9 @@ class KochRobot:
Example of highest frequency data collection without camera:
```python
# Assumes leader and follower arms have been instantiated already (see first example)
robot = KochRobot(
robot = ManipulatorRobot(
robot_type="koch",
calibration_dir=".cache/calibration/koch",
leader_arms=leader_arms,
follower_arms=follower_arms,
)
@@ -267,7 +289,9 @@ class KochRobot:
}
# Assumes leader and follower arms have been instantiated already (see first example)
robot = KochRobot(
robot = ManipulatorRobot(
robot_type="koch",
calibration_dir=".cache/calibration/koch",
leader_arms=leader_arms,
follower_arms=follower_arms,
cameras=cameras,
@@ -280,7 +304,9 @@ class KochRobot:
Example of controlling the robot with a policy (without running multiple policies in parallel to ensure highest frequency):
```python
# Assumes leader and follower arms + cameras have been instantiated already (see previous example)
robot = KochRobot(
robot = ManipulatorRobot(
robot_type="koch",
calibration_dir=".cache/calibration/koch",
leader_arms=leader_arms,
follower_arms=follower_arms,
cameras=cameras,
@@ -306,16 +332,17 @@ class KochRobot:
def __init__(
self,
config: KochRobotConfig | None = None,
calibration_path: Path = ".cache/calibration/koch.pkl",
config: ManipulatorRobotConfig | None = None,
calibration_dir: Path = ".cache/calibration/koch",
**kwargs,
):
if config is None:
config = KochRobotConfig()
config = ManipulatorRobotConfig()
# Overwrite config arguments using kwargs
self.config = replace(config, **kwargs)
self.calibration_path = Path(calibration_path)
self.calibration_dir = Path(calibration_dir)
self.robot_type = self.config.robot_type
self.leader_arms = self.config.leader_arms
self.follower_arms = self.config.follower_arms
self.cameras = self.config.cameras
@@ -325,12 +352,12 @@ class KochRobot:
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
"KochRobot is already connected. Do not run `robot.connect()` twice."
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
)
if not self.leader_arms and not self.follower_arms and not self.cameras:
raise ValueError(
"KochRobot doesn't have any device to connect. See example of usage in docstring of the class."
"ManipulatorRobot doesn't have any device to connect. See example of usage in docstring of the class."
)
# Connect the arms
@@ -340,38 +367,22 @@ class KochRobot:
print(f"Connecting {name} leader arm.")
self.leader_arms[name].connect()
# Reset the arms and load or run calibration
if self.calibration_path.exists():
# Reset all arms before setting calibration
for name in self.follower_arms:
reset_torque_mode(self.follower_arms[name])
for name in self.leader_arms:
reset_torque_mode(self.leader_arms[name])
with open(self.calibration_path, "rb") as f:
calibration = pickle.load(f)
else:
print(f"Missing calibration file '{self.calibration_path}'. Starting calibration precedure.")
# Run calibration process which begins by reseting all arms
calibration = self.run_calibration()
print(f"Calibration is done! Saving calibration file '{self.calibration_path}'")
self.calibration_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_path, "wb") as f:
pickle.dump(calibration, f)
# Set calibration
# We assume that at connection time, arms are in a rest position, and torque can
# be safely disabled to run calibration and/or set robot preset configurations.
for name in self.follower_arms:
self.follower_arms[name].set_calibration(calibration[f"follower_{name}"])
self.follower_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value)
for name in self.leader_arms:
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
self.leader_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value)
# Set better PID values to close the gap between recored states and actions
# TODO(rcadene): Implement an automatic procedure to set optimial PID values for each motor
for name in self.follower_arms:
self.follower_arms[name].write("Position_P_Gain", 1500, "elbow_flex")
self.follower_arms[name].write("Position_I_Gain", 0, "elbow_flex")
self.follower_arms[name].write("Position_D_Gain", 600, "elbow_flex")
self.activate_calibration()
# Set robot preset (e.g. torque in leader gripper for Koch v1.1)
if self.robot_type == "koch":
self.set_koch_robot_preset()
elif self.robot_type == "aloha":
self.set_aloha_robot_preset()
else:
warnings.warn(f"No preset found for robot type: {self.robot_type}", stacklevel=1)
# Enable torque on all motors of the follower arms
for name in self.follower_arms:
@@ -391,31 +402,121 @@ class KochRobot:
self.is_connected = True
def run_calibration(self):
calibration = {}
def activate_calibration(self):
"""After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
def load_or_run_calibration_(name, arm, arm_type):
arm_id = get_arm_id(name, arm_type)
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
return calibration
for name, arm in self.follower_arms.items():
calibration = load_or_run_calibration_(name, arm, "follower")
arm.set_calibration(calibration)
for name, arm in self.leader_arms.items():
calibration = load_or_run_calibration_(name, arm, "leader")
arm.set_calibration(calibration)
def set_koch_robot_preset(self):
def set_operating_mode_(arm):
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run set robot preset, the torque must be disabled on all motors.")
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
if len(all_motors_except_gripper) > 0:
# 4 corresponds to Extended Position on Koch motors
arm.write("Operating_Mode", 4, all_motors_except_gripper)
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288"
arm.write("Operating_Mode", 5, "gripper")
for name in self.follower_arms:
homing_offset, drive_mode = run_arm_calibration(self.follower_arms[name], name, "follower")
set_operating_mode_(self.follower_arms[name])
calibration[f"follower_{name}"] = {}
for idx, motor_name in enumerate(self.follower_arms[name].motor_names):
calibration[f"follower_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
# Set better PID values to close the gap between recorded states and actions
# TODO(rcadene): Implement an automatic procedure to set optimial PID values for each motor
self.follower_arms[name].write("Position_P_Gain", 1500, "elbow_flex")
self.follower_arms[name].write("Position_I_Gain", 0, "elbow_flex")
self.follower_arms[name].write("Position_D_Gain", 600, "elbow_flex")
if self.config.gripper_open_degree is not None:
for name in self.leader_arms:
set_operating_mode_(self.leader_arms[name])
# Enable torque on the gripper of the leader arms, and move it to 45 degrees,
# so that we can use it as a trigger to close the gripper of the follower arms.
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper")
def set_aloha_robot_preset(self):
def set_shadow_(arm):
# Set secondary/shadow ID for shoulder and elbow. These joints have two motors.
# As a result, if only one of them is required to move to a certain position,
# the other will follow. This is to avoid breaking the motors.
if "shoulder_shadow" in arm.motor_names:
shoulder_idx = arm.read("ID", "shoulder")
arm.write("Secondary_ID", shoulder_idx, "shoulder_shadow")
if "elbow_shadow" in arm.motor_names:
elbow_idx = arm.read("ID", "elbow")
arm.write("Secondary_ID", elbow_idx, "elbow_shadow")
for name in self.follower_arms:
set_shadow_(self.follower_arms[name])
for name in self.leader_arms:
homing_offset, drive_mode = run_arm_calibration(self.leader_arms[name], name, "leader")
set_shadow_(self.leader_arms[name])
calibration[f"leader_{name}"] = {}
for idx, motor_name in enumerate(self.leader_arms[name].motor_names):
calibration[f"leader_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
for name in self.follower_arms:
# Set a velocity limit of 131 as advised by Trossen Robotics
self.follower_arms[name].write("Velocity_Limit", 131)
return calibration
# Use 'position control current based' for follower gripper to be limited by the limit of the current.
# It can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# 5 corresponds to Current Controlled Position on Aloha gripper follower "xm430-w350"
self.follower_arms[name].write("Operating_Mode", 5, "gripper")
# Note: We can't enable torque on the leader gripper since "xc430-w150" doesn't have
# a Current Controlled Position mode.
if self.config.gripper_open_degree is not None:
warnings.warn(
f"`gripper_open_degree` is set to {self.config.gripper_open_degree}, but None is expected for Aloha instead",
stacklevel=1,
)
def teleop_step(
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()`."
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
# Prepare to assign the position of the leader to the follower
@@ -423,16 +524,27 @@ class KochRobot:
for name in self.leader_arms:
before_lread_t = time.perf_counter()
leader_pos[name] = self.leader_arms[name].read("Present_Position")
leader_pos[name] = torch.from_numpy(leader_pos[name])
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - before_lread_t
# Send goal position to the follower
follower_goal_pos = {}
for name in self.leader_arms:
follower_goal_pos[name] = leader_pos[name]
# Send action
for name in self.follower_arms:
before_fwrite_t = time.perf_counter()
self.send_action(torch.tensor(follower_goal_pos[name]), [name])
goal_pos = leader_pos[name]
# Cap goal position when too far away from present position.
# Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.follower_arms[name].read("Present_Position")
present_pos = torch.from_numpy(present_pos)
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
# Used when record_data=True
follower_goal_pos[name] = goal_pos
goal_pos = goal_pos.numpy().astype(np.int32)
self.follower_arms[name].write("Goal_Position", goal_pos)
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t
# Early exit when recording data is not requested
@@ -445,6 +557,7 @@ class KochRobot:
for name in self.follower_arms:
before_fread_t = time.perf_counter()
follower_pos[name] = self.follower_arms[name].read("Present_Position")
follower_pos[name] = torch.from_numpy(follower_pos[name])
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
# Create state by concatenating follower current position
@@ -452,29 +565,30 @@ class KochRobot:
for name in self.follower_arms:
if name in follower_pos:
state.append(follower_pos[name])
state = np.concatenate(state)
state = torch.cat(state)
# Create action by concatenating follower goal position
action = []
for name in self.follower_arms:
if name in follower_goal_pos:
action.append(follower_goal_pos[name])
action = np.concatenate(action)
action = torch.cat(action)
# Capture images from cameras
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries and format to pytorch
# Populate output dictionnaries
obs_dict, action_dict = {}, {}
obs_dict["observation.state"] = torch.from_numpy(state)
action_dict["action"] = torch.from_numpy(action)
obs_dict["observation.state"] = state
action_dict["action"] = action
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict, action_dict
@@ -482,7 +596,7 @@ class KochRobot:
"""The returned observations do not have a batch dimension."""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()`."
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
# Read follower position
@@ -490,6 +604,7 @@ class KochRobot:
for name in self.follower_arms:
before_fread_t = time.perf_counter()
follower_pos[name] = self.follower_arms[name].read("Present_Position")
follower_pos[name] = torch.from_numpy(follower_pos[name])
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
# Create state by concatenating follower current position
@@ -497,82 +612,68 @@ class KochRobot:
for name in self.follower_arms:
if name in follower_pos:
state.append(follower_pos[name])
state = np.concatenate(state)
state = torch.cat(state)
# Capture images from cameras
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries and format to pytorch
obs_dict = {}
obs_dict["observation.state"] = torch.from_numpy(state)
obs_dict["observation.state"] = state
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict
def send_action(self, action: torch.Tensor, follower_names: list[str] | None = None):
def send_action(self, action: torch.Tensor) -> torch.Tensor:
"""Command the follower arms to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`.
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Args:
action: tensor containing the concatenated joint positions for the follower arms.
follower_names: Pass follower arm names to only control a subset of all the follower arms.
action: tensor containing the concatenated goal positions for the follower arms.
"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()`."
)
if follower_names is None:
follower_names = list(self.follower_arms)
elif not set(follower_names).issubset(self.follower_arms):
raise ValueError(
f"You provided {follower_names=} but only the following arms are registered: "
f"{list(self.follower_arms)}"
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
from_idx = 0
to_idx = 0
follower_goal_pos = {}
for name in follower_names:
action_sent = []
for name in self.follower_arms:
# Get goal position of each follower arm by splitting the action vector
to_idx += len(self.follower_arms[name].motor_names)
this_action = action[from_idx:to_idx]
if self.config.max_relative_target is not None:
if not isinstance(self.config.max_relative_target, list):
max_relative_target = [self.config.max_relative_target for _ in range(from_idx, to_idx)]
max_relative_target = torch.tensor(self.config.max_relative_target)
# Cap relative action target magnitude for safety.
current_pos = torch.tensor(self.follower_arms[name].read("Present_Position"))
diff = this_action - current_pos
safe_diff = torch.minimum(diff, max_relative_target)
safe_diff = torch.maximum(safe_diff, -max_relative_target)
safe_action = current_pos + safe_diff
if not torch.allclose(safe_action, this_action):
logging.warning(
"Relative action magnitude had to be clamped to be safe.\n"
f" requested relative action target: {diff}\n"
f" clamped relative action target: {safe_diff}"
)
follower_goal_pos[name] = safe_action.numpy()
else:
follower_goal_pos[name] = this_action.numpy()
goal_pos = action[from_idx:to_idx]
from_idx = to_idx
for name in self.follower_arms:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32))
# Cap goal position when too far away from present position.
# Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.follower_arms[name].read("Present_Position")
present_pos = torch.from_numpy(present_pos)
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
# Save tensor to concat and return
action_sent.append(goal_pos)
# Send goal position to each follower
goal_pos = goal_pos.numpy().astype(np.int32)
self.follower_arms[name].write("Goal_Position", goal_pos)
return torch.cat(action_sent)
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()` before disconnecting."
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
)
for name in self.follower_arms:

View File

@@ -1,6 +1,13 @@
from typing import Protocol
def get_arm_id(name, arm_type):
"""Returns the string identifier of a robot arm. For instance, for a bimanual manipulator
like Aloha, it could be left_follower, right_follower, left_leader, or right_leader.
"""
return f"{name}_{arm_type}"
class Robot(Protocol):
def init_teleop(self): ...
def run_calibration(self): ...

View File

@@ -1,3 +1,15 @@
import time
def busy_wait(seconds):
# Significantly more accurate than `time.sleep`, and mandatory for our use case,
# but it consumes CPU cycles.
# TODO(rcadene): find an alternative: from python 11, time.sleep is precise
end_time = time.perf_counter() + seconds
while time.perf_counter() < end_time:
pass
class RobotDeviceNotConnectedError(Exception):
"""Exception raised when the robot device is not connected."""