auto calibration works

This commit is contained in:
Remi Cadene
2024-10-19 17:52:47 +02:00
parent 994209d1b0
commit 48da694883
5 changed files with 283 additions and 49 deletions

View File

@@ -127,8 +127,8 @@ MODEL_BAUDRATE_TABLE = {
"sts3215": SCS_SERIES_BAUDRATE_TABLE, "sts3215": SCS_SERIES_BAUDRATE_TABLE,
} }
NUM_READ_RETRY = 2 NUM_READ_RETRY = 20
NUM_WRITE_RETRY = 2 NUM_WRITE_RETRY = 20
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray: def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray:
@@ -656,7 +656,7 @@ class FeetechMotorsBus:
return values return values
def read_with_motor_ids(self, motor_models, motor_ids, data_name): def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
return_list = True return_list = True
if not isinstance(motor_ids, list): if not isinstance(motor_ids, list):
return_list = False return_list = False
@@ -668,7 +668,7 @@ class FeetechMotorsBus:
for idx in motor_ids: for idx in motor_ids:
group.addParam(idx) group.addParam(idx)
for _ in range(NUM_READ_RETRY): for _ in range(num_retry):
comm = group.txRxPacket() comm = group.txRxPacket()
if comm == COMM_SUCCESS: if comm == COMM_SUCCESS:
break break
@@ -758,7 +758,7 @@ class FeetechMotorsBus:
return values return values
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values): def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
if not isinstance(motor_ids, list): if not isinstance(motor_ids, list):
motor_ids = [motor_ids] motor_ids = [motor_ids]
if not isinstance(values, list): if not isinstance(values, list):
@@ -771,7 +771,11 @@ class FeetechMotorsBus:
data = convert_to_bytes(value, bytes) data = convert_to_bytes(value, bytes)
group.addParam(idx, data) group.addParam(idx, data)
comm = group.txPacket() for _ in range(num_retry):
comm = group.txPacket()
if comm == COMM_SUCCESS:
break
if comm != COMM_SUCCESS: if comm != COMM_SUCCESS:
raise ConnectionError( raise ConnectionError(
f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: " f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
@@ -797,8 +801,6 @@ class FeetechMotorsBus:
values = np.array(values) values = np.array(values)
print(values)
motor_ids = [] motor_ids = []
models = [] models = []
for name in motor_names: for name in motor_names:

View File

@@ -36,9 +36,9 @@ def apply_drive_mode(position, drive_mode):
def compute_nearest_rounded_position(position, models): def compute_nearest_rounded_position(position, models):
# delta_turn = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, models) delta_turn = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, models)
# nearest_pos = np.round(position.astype(float) / delta_turn) * delta_turn nearest_pos = np.round(position.astype(float) / delta_turn) * delta_turn
# return nearest_pos.astype(position.dtype) return nearest_pos.astype(position.dtype)
return position return position

View File

@@ -1,6 +1,8 @@
"""Logic to calibrate a robot arm built with feetech motors""" """Logic to calibrate a robot arm built with feetech motors"""
# TODO(rcadene, aliberts): move this logic into the robot code when refactoring # TODO(rcadene, aliberts): move this logic into the robot code when refactoring
import time
import numpy as np import numpy as np
from lerobot.common.robot_devices.motors.feetech import ( from lerobot.common.robot_devices.motors.feetech import (
@@ -42,7 +44,227 @@ def compute_nearest_rounded_position(position, models):
return position return position
def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str): def move_until_block(arm, motor_name, positive_direction=True, while_move_hook=None):
count = 0
while True:
present_pos = arm.read("Present_Position", motor_name)
if positive_direction:
arm.write("Goal_Position", present_pos + 100, motor_name)
else:
arm.write("Goal_Position", present_pos - 100, motor_name)
if while_move_hook is not None:
while_move_hook()
present_pos = arm.read("Present_Position", motor_name).item()
present_speed = arm.read("Present_Speed", motor_name).item()
present_load = arm.read("Present_Load", motor_name).item()
# present_voltage = arm.read("Present_Voltage", motor_name).item()
# present_temperature = arm.read("Present_Temperature", motor_name).item()
# print(f"{present_pos=}")
# print(f"{present_speed=}")
# print(f"{present_load=}")
# print(f"{present_voltage=}")
# print(f"{present_temperature=}")
if present_load > 100 and present_speed == 0:
count += 1
if count > 100:
return present_pos
else:
count = 0
def move_to_calibrate(arm, motor_name, positive_first=True, in_between_move_hook=None, while_move_hook=None):
initial_pos = arm.read("Present_Position", motor_name)
if positive_first:
p_present_pos = move_until_block(
arm, motor_name, positive_direction=True, while_move_hook=while_move_hook
)
p_delta_pos = abs(initial_pos - p_present_pos)
if in_between_move_hook is not None:
in_between_move_hook()
n_present_pos = move_until_block(
arm, motor_name, positive_direction=False, while_move_hook=while_move_hook
)
n_delta_pos = abs(initial_pos - n_present_pos)
else:
n_present_pos = move_until_block(
arm, motor_name, positive_direction=False, while_move_hook=while_move_hook
)
n_delta_pos = abs(initial_pos - n_present_pos)
if in_between_move_hook is not None:
in_between_move_hook()
p_present_pos = move_until_block(
arm, motor_name, positive_direction=True, while_move_hook=while_move_hook
)
p_delta_pos = abs(initial_pos - p_present_pos)
if p_delta_pos < n_delta_pos:
invert_drive_mode = False
min_pos = n_present_pos
max_pos = p_present_pos
zero_pos = (min_pos + max_pos) / 2
homing_offset = -zero_pos
else:
invert_drive_mode = True
min_pos = p_present_pos
max_pos = n_present_pos
zero_pos = (min_pos + max_pos) / 2
homing_offset = zero_pos
calib_data = {
"homing_offset": homing_offset,
"invert_drive_mode": invert_drive_mode,
"drive_mode": -1 if invert_drive_mode else 0,
"zero_pos": zero_pos,
"min_pos": min_pos,
"max_pos": max_pos,
}
return calib_data
def apply_offset(calib, offset):
calib["zero_pos"] += offset
if calib["drive_mode"]:
calib["homing_offset"] += offset
else:
calib["homing_offset"] -= offset
return calib
def run_arm_auto_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run calibration, the torque must be disabled on all motors.")
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
print("\nMove arm to initial position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial"))
input("Press Enter to continue...")
arm.write("Lock", 0)
arm.write("Acceleration", 10)
time.sleep(1)
arm.write("Torque_Enable", TorqueMode.ENABLED.value)
calib = {}
# Calibrate shoulder_pan
calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan")
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
time.sleep(1)
# Calibrate elbow_flex
calib["elbow_flex"] = move_to_calibrate(arm, "elbow_flex")
calib["elbow_flex"] = apply_offset(calib["elbow_flex"], offset=130 - 1024)
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 1024, "elbow_flex")
time.sleep(1)
# Calibrate wrist_flex
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex")
calib["wrist_flex"] = apply_offset(calib["wrist_flex"], offset=100)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"], "wrist_flex")
time.sleep(1)
# Calibrate gripper
calib["gripper"] = move_to_calibrate(arm, "gripper")
tmp_max_pos = calib["gripper"]["max_pos"]
calib["gripper"]["max_pos"] = calib["gripper"]["min_pos"]
calib["gripper"]["min_pos"] = tmp_max_pos
arm.write("Goal_Position", calib["gripper"]["min_pos"], "gripper")
time.sleep(1)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1350, "wrist_flex")
time.sleep(1)
# Calibrate shoulder_lift
def in_between_move_hook():
nonlocal arm, calib
initial_pos = arm.read("Present_Position", "shoulder_lift")
arm.write("Goal_Position", initial_pos + 900, "shoulder_lift")
time.sleep(1)
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] - 150, "elbow_flex")
time.sleep(1)
calib["shoulder_lift"] = move_to_calibrate(
arm, "shoulder_lift", positive_first=False, in_between_move_hook=in_between_move_hook
)
# add an 30 steps as offset to align with body
calib["shoulder_lift"] = apply_offset(calib["shoulder_lift"], offset=30 + 1024 - 120)
# Calibrate wrist_roll
def while_move_hook():
nonlocal arm, calib
positions = {
"shoulder_lift": round(calib["shoulder_lift"]["zero_pos"] - 1600),
"elbow_flex": round(calib["elbow_flex"]["zero_pos"] + 1700),
"wrist_flex": round(calib["wrist_flex"]["zero_pos"] + 800),
"gripper": round(calib["gripper"]["max_pos"] - 400),
}
arm.write("Goal_Position", list(positions.values()), list(positions.keys()))
arm.write("Goal_Position", round(calib["shoulder_lift"]["zero_pos"] - 1600), "shoulder_lift")
time.sleep(2)
arm.write("Goal_Position", round(calib["elbow_flex"]["zero_pos"] + 1700), "elbow_flex")
time.sleep(2)
arm.write("Goal_Position", round(calib["wrist_flex"]["zero_pos"] + 800), "wrist_flex")
time.sleep(2)
arm.write("Goal_Position", round(calib["gripper"]["max_pos"] - 400), "gripper")
time.sleep(2)
calib["wrist_roll"] = move_to_calibrate(
arm, "wrist_roll", positive_first=False, while_move_hook=while_move_hook
)
arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"], "wrist_roll")
time.sleep(1)
arm.write("Goal_Position", calib["gripper"]["min_pos"], "gripper")
time.sleep(1)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"], "wrist_flex")
time.sleep(1)
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 2048, "elbow_flex")
arm.write("Goal_Position", calib["shoulder_lift"]["zero_pos"] - 2048, "shoulder_lift")
time.sleep(1)
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
time.sleep(1)
calib_modes = []
for name in arm.motor_names:
if name == "gripper":
calib_modes.append(CalibrationMode.LINEAR.name)
else:
calib_modes.append(CalibrationMode.DEGREE.name)
calib_dict = {
"homing_offset": [calib[name]["homing_offset"] for name in arm.motor_names],
"drive_mode": [calib[name]["drive_mode"] for name in arm.motor_names],
"start_pos": [calib[name]["min_pos"] for name in arm.motor_names],
"end_pos": [calib[name]["max_pos"] for name in arm.motor_names],
"calib_mode": calib_modes,
"motor_names": arm.motor_names,
}
return calib_dict
def run_arm_manual_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""This function ensures that a neural network trained on data collected on a given robot """This function ensures that a neural network trained on data collected on a given robot
can work on another robot. For instance before calibration, setting a same goal position can work on another robot. For instance before calibration, setting a same goal position
for each motor of two different robots will get two very different positions. But after calibration, for each motor of two different robots will get two very different positions. But after calibration,
@@ -80,8 +302,7 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type
# Compute homing offset so that `present_position + homing_offset ~= target_position`. # Compute homing offset so that `present_position + homing_offset ~= target_position`.
zero_pos = arm.read("Present_Position") zero_pos = arm.read("Present_Position")
zero_nearest_pos = compute_nearest_rounded_position(zero_pos, arm.motor_models) homing_offset = zero_target_pos - zero_pos
homing_offset = zero_target_pos - zero_nearest_pos
# The rotated target position corresponds to a rotation of a quarter turn from the zero position. # The rotated target position corresponds to a rotation of a quarter turn from the zero position.
# This allows to identify the rotation direction of each motor. # This allows to identify the rotation direction of each motor.
@@ -103,8 +324,7 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type
# Re-compute homing offset to take into account drive mode # Re-compute homing offset to take into account drive mode
rotated_drived_pos = apply_drive_mode(rotated_pos, drive_mode) rotated_drived_pos = apply_drive_mode(rotated_pos, drive_mode)
rotated_nearest_pos = compute_nearest_rounded_position(rotated_drived_pos, arm.motor_models) homing_offset = rotated_target_pos - rotated_drived_pos
homing_offset = rotated_target_pos - rotated_nearest_pos
print("\nMove arm to rest position") print("\nMove arm to rest position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest")) print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest"))
@@ -112,20 +332,19 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type
print() print()
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180] # Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
calib_mode = [CalibrationMode.DEGREE.name] * len(arm.motor_names) calib_modes = []
for name in arm.motor_names:
# TODO(rcadene): make type of joints (DEGREE or LINEAR) configurable from yaml? if name == "gripper":
if robot_type in ["so100", "moss"] and "gripper" in arm.motor_names: calib_modes.append(CalibrationMode.LINEAR.name)
# Joints with linear motions (like gripper of Aloha) are experessed in nominal range of [0, 100] else:
calib_idx = arm.motor_names.index("gripper") calib_modes.append(CalibrationMode.DEGREE.name)
calib_mode[calib_idx] = CalibrationMode.LINEAR.name
calib_data = { calib_data = {
"homing_offset": homing_offset.tolist(), "homing_offset": homing_offset.tolist(),
"drive_mode": drive_mode.tolist(), "drive_mode": drive_mode.tolist(),
"start_pos": zero_pos.tolist(), "start_pos": zero_pos.tolist(),
"end_pos": rotated_pos.tolist(), "end_pos": rotated_pos.tolist(),
"calib_mode": calib_mode, "calib_mode": calib_modes,
"motor_names": arm.motor_names, "motor_names": arm.motor_names,
} }
return calib_data return calib_data

View File

@@ -332,10 +332,22 @@ class ManipulatorRobot:
if self.robot_type in ["koch", "aloha"]: if self.robot_type in ["koch", "aloha"]:
from lerobot.common.robot_devices.robots.dynamixel_calibration import run_arm_calibration from lerobot.common.robot_devices.robots.dynamixel_calibration import run_arm_calibration
elif self.robot_type in ["so100", "moss"]:
from lerobot.common.robot_devices.robots.feetech_calibration import run_arm_calibration
calibration = run_arm_calibration(arm, self.robot_type, name, arm_type) calibration = run_arm_calibration(arm, self.robot_type, name, arm_type)
elif self.robot_type in ["so100", "moss"]:
from lerobot.common.robot_devices.robots.feetech_calibration import (
run_arm_auto_calibration,
run_arm_manual_calibration,
)
if arm_type == "leader":
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
elif arm_type == "follower":
calibration = run_arm_auto_calibration(arm, self.robot_type, name, arm_type)
else:
raise ValueError(arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'") print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True) arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
@@ -454,9 +466,11 @@ class ManipulatorRobot:
# Close the write lock so that Maximum_Acceleration gets written to EPROM address, # Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting. # which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.follower_arms[name].write("Lock", 0) self.follower_arms[name].write("Lock", 0)
# Set Maximum_Acceleration to 250 to speedup acceleration and deceleration of # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table # the motors. Note: this configuration is not in the official STS3215 Memory Table
self.follower_arms[name].write("Maximum_Acceleration", 250) self.follower_arms[name].write("Maximum_Acceleration", 254)
self.follower_arms[name].write("Acceleration", 254)
time.sleep(1)
def teleop_step( def teleop_step(
self, record_data=False self, record_data=False

View File

@@ -18,13 +18,13 @@ import time
def configure_motor(port, brand, model, motor_idx_des, baudrate_des): def configure_motor(port, brand, model, motor_idx_des, baudrate_des):
if brand == "feetech": if brand == "feetech":
from lerobot.common.robot_devices.motors.feetech import MODEL_BAUDRATE_TABLE, NUM_WRITE_RETRY from lerobot.common.robot_devices.motors.feetech import MODEL_BAUDRATE_TABLE
from lerobot.common.robot_devices.motors.feetech import ( from lerobot.common.robot_devices.motors.feetech import (
SCS_SERIES_BAUDRATE_TABLE as SERIES_BAUDRATE_TABLE, SCS_SERIES_BAUDRATE_TABLE as SERIES_BAUDRATE_TABLE,
) )
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus as MotorsBusClass from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus as MotorsBusClass
elif brand == "dynamixel": elif brand == "dynamixel":
from lerobot.common.robot_devices.motors.dynamixel import MODEL_BAUDRATE_TABLE, NUM_WRITE_RETRY from lerobot.common.robot_devices.motors.dynamixel import MODEL_BAUDRATE_TABLE
from lerobot.common.robot_devices.motors.dynamixel import ( from lerobot.common.robot_devices.motors.dynamixel import (
X_SERIES_BAUDRATE_TABLE as SERIES_BAUDRATE_TABLE, X_SERIES_BAUDRATE_TABLE as SERIES_BAUDRATE_TABLE,
) )
@@ -82,40 +82,39 @@ def configure_motor(port, brand, model, motor_idx_des, baudrate_des):
print(f"Motor index found at: {motor_index}") print(f"Motor index found at: {motor_index}")
if brand == "feetech":
# Allows ID and BAUDRATE to be written in memory
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Lock", 0)
if baudrate != baudrate_des: if baudrate != baudrate_des:
print(f"Setting its baudrate to {baudrate_des}") print(f"Setting its baudrate to {baudrate_des}")
baudrate_idx = list(SERIES_BAUDRATE_TABLE.values()).index(baudrate_des) baudrate_idx = list(SERIES_BAUDRATE_TABLE.values()).index(baudrate_des)
# The write can fail, so we allow retries # The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY): motor_bus.write_with_motor_ids(
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Baud_Rate", baudrate_idx) motor_bus.motor_models, motor_index, "Baud_Rate", baudrate_idx, num_retry=2
time.sleep(0.5) )
motor_bus.set_bus_baudrate(baudrate_des) time.sleep(0.5)
try: motor_bus.set_bus_baudrate(baudrate_des)
present_baudrate_idx = motor_bus.read_with_motor_ids( present_baudrate_idx = motor_bus.read_with_motor_ids(
motor_bus.motor_models, motor_index, "Baud_Rate" motor_bus.motor_models, motor_index, "Baud_Rate", num_retry=2
) )
except ConnectionError:
print("Failed to write baudrate. Retrying.")
motor_bus.set_bus_baudrate(baudrate)
continue
break
else:
raise OSError("Failed to write baudrate.")
if present_baudrate_idx != baudrate_idx: if present_baudrate_idx != baudrate_idx:
raise OSError("Failed to write baudrate.") raise OSError("Failed to write baudrate.")
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Lock", 0)
print(f"Setting its index to desired index {motor_idx_des}") print(f"Setting its index to desired index {motor_idx_des}")
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "ID", motor_idx_des) motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "ID", motor_idx_des)
present_idx = motor_bus.read_with_motor_ids(motor_bus.motor_models, motor_idx_des, "ID") present_idx = motor_bus.read_with_motor_ids(motor_bus.motor_models, motor_idx_des, "ID", num_retry=2)
if present_idx != motor_idx_des: if present_idx != motor_idx_des:
raise OSError("Failed to write index.") raise OSError("Failed to write index.")
if brand == "feetech": if brand == "feetech":
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
motor_bus.write("Maximum_Acceleration", 254)
motor_bus.write("Goal_Position", 2047) motor_bus.write("Goal_Position", 2047)
time.sleep(4) time.sleep(4)
print("Present Position", motor_bus.read("Present_Position")) print("Present Position", motor_bus.read("Present_Position"))