diff --git a/lerobot/common/robot_devices/motors/feetech.py b/lerobot/common/robot_devices/motors/feetech.py index f61b693e..3c134d64 100644 --- a/lerobot/common/robot_devices/motors/feetech.py +++ b/lerobot/common/robot_devices/motors/feetech.py @@ -127,8 +127,8 @@ MODEL_BAUDRATE_TABLE = { "sts3215": SCS_SERIES_BAUDRATE_TABLE, } -NUM_READ_RETRY = 2 -NUM_WRITE_RETRY = 2 +NUM_READ_RETRY = 20 +NUM_WRITE_RETRY = 20 def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray: @@ -656,7 +656,7 @@ class FeetechMotorsBus: return values - def read_with_motor_ids(self, motor_models, motor_ids, data_name): + def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY): return_list = True if not isinstance(motor_ids, list): return_list = False @@ -668,7 +668,7 @@ class FeetechMotorsBus: for idx in motor_ids: group.addParam(idx) - for _ in range(NUM_READ_RETRY): + for _ in range(num_retry): comm = group.txRxPacket() if comm == COMM_SUCCESS: break @@ -758,7 +758,7 @@ class FeetechMotorsBus: return values - def write_with_motor_ids(self, motor_models, motor_ids, data_name, values): + def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY): if not isinstance(motor_ids, list): motor_ids = [motor_ids] if not isinstance(values, list): @@ -771,7 +771,11 @@ class FeetechMotorsBus: data = convert_to_bytes(value, bytes) group.addParam(idx, data) - comm = group.txPacket() + for _ in range(num_retry): + comm = group.txPacket() + if comm == COMM_SUCCESS: + break + if comm != COMM_SUCCESS: raise ConnectionError( f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: " @@ -797,8 +801,6 @@ class FeetechMotorsBus: values = np.array(values) - print(values) - motor_ids = [] models = [] for name in motor_names: diff --git a/lerobot/common/robot_devices/robots/dynamixel_calibration.py b/lerobot/common/robot_devices/robots/dynamixel_calibration.py index a3c9d323..2d582c89 100644 --- a/lerobot/common/robot_devices/robots/dynamixel_calibration.py +++ b/lerobot/common/robot_devices/robots/dynamixel_calibration.py @@ -36,9 +36,9 @@ def apply_drive_mode(position, drive_mode): def compute_nearest_rounded_position(position, models): - # delta_turn = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, models) - # nearest_pos = np.round(position.astype(float) / delta_turn) * delta_turn - # return nearest_pos.astype(position.dtype) + delta_turn = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, models) + nearest_pos = np.round(position.astype(float) / delta_turn) * delta_turn + return nearest_pos.astype(position.dtype) return position diff --git a/lerobot/common/robot_devices/robots/feetech_calibration.py b/lerobot/common/robot_devices/robots/feetech_calibration.py index e3e0c6e1..cb67a549 100644 --- a/lerobot/common/robot_devices/robots/feetech_calibration.py +++ b/lerobot/common/robot_devices/robots/feetech_calibration.py @@ -1,6 +1,8 @@ """Logic to calibrate a robot arm built with feetech motors""" # TODO(rcadene, aliberts): move this logic into the robot code when refactoring +import time + import numpy as np from lerobot.common.robot_devices.motors.feetech import ( @@ -42,7 +44,227 @@ def compute_nearest_rounded_position(position, models): return position -def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str): +def move_until_block(arm, motor_name, positive_direction=True, while_move_hook=None): + count = 0 + while True: + present_pos = arm.read("Present_Position", motor_name) + if positive_direction: + arm.write("Goal_Position", present_pos + 100, motor_name) + else: + arm.write("Goal_Position", present_pos - 100, motor_name) + + if while_move_hook is not None: + while_move_hook() + + present_pos = arm.read("Present_Position", motor_name).item() + present_speed = arm.read("Present_Speed", motor_name).item() + present_load = arm.read("Present_Load", motor_name).item() + # present_voltage = arm.read("Present_Voltage", motor_name).item() + # present_temperature = arm.read("Present_Temperature", motor_name).item() + + # print(f"{present_pos=}") + # print(f"{present_speed=}") + # print(f"{present_load=}") + # print(f"{present_voltage=}") + # print(f"{present_temperature=}") + + if present_load > 100 and present_speed == 0: + count += 1 + if count > 100: + return present_pos + else: + count = 0 + + +def move_to_calibrate(arm, motor_name, positive_first=True, in_between_move_hook=None, while_move_hook=None): + initial_pos = arm.read("Present_Position", motor_name) + + if positive_first: + p_present_pos = move_until_block( + arm, motor_name, positive_direction=True, while_move_hook=while_move_hook + ) + p_delta_pos = abs(initial_pos - p_present_pos) + + if in_between_move_hook is not None: + in_between_move_hook() + + n_present_pos = move_until_block( + arm, motor_name, positive_direction=False, while_move_hook=while_move_hook + ) + n_delta_pos = abs(initial_pos - n_present_pos) + else: + n_present_pos = move_until_block( + arm, motor_name, positive_direction=False, while_move_hook=while_move_hook + ) + n_delta_pos = abs(initial_pos - n_present_pos) + + if in_between_move_hook is not None: + in_between_move_hook() + + p_present_pos = move_until_block( + arm, motor_name, positive_direction=True, while_move_hook=while_move_hook + ) + p_delta_pos = abs(initial_pos - p_present_pos) + + if p_delta_pos < n_delta_pos: + invert_drive_mode = False + min_pos = n_present_pos + max_pos = p_present_pos + zero_pos = (min_pos + max_pos) / 2 + homing_offset = -zero_pos + else: + invert_drive_mode = True + min_pos = p_present_pos + max_pos = n_present_pos + zero_pos = (min_pos + max_pos) / 2 + homing_offset = zero_pos + + calib_data = { + "homing_offset": homing_offset, + "invert_drive_mode": invert_drive_mode, + "drive_mode": -1 if invert_drive_mode else 0, + "zero_pos": zero_pos, + "min_pos": min_pos, + "max_pos": max_pos, + } + return calib_data + + +def apply_offset(calib, offset): + calib["zero_pos"] += offset + if calib["drive_mode"]: + calib["homing_offset"] += offset + else: + calib["homing_offset"] -= offset + return calib + + +def run_arm_auto_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str): + if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any(): + raise ValueError("To run calibration, the torque must be disabled on all motors.") + + print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...") + + print("\nMove arm to initial position") + print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial")) + input("Press Enter to continue...") + + arm.write("Lock", 0) + arm.write("Acceleration", 10) + time.sleep(1) + + arm.write("Torque_Enable", TorqueMode.ENABLED.value) + + calib = {} + + # Calibrate shoulder_pan + + calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan") + arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan") + time.sleep(1) + + # Calibrate elbow_flex + + calib["elbow_flex"] = move_to_calibrate(arm, "elbow_flex") + calib["elbow_flex"] = apply_offset(calib["elbow_flex"], offset=130 - 1024) + + arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 1024, "elbow_flex") + time.sleep(1) + + # Calibrate wrist_flex + + calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex") + calib["wrist_flex"] = apply_offset(calib["wrist_flex"], offset=100) + + arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"], "wrist_flex") + time.sleep(1) + + # Calibrate gripper + + calib["gripper"] = move_to_calibrate(arm, "gripper") + + tmp_max_pos = calib["gripper"]["max_pos"] + calib["gripper"]["max_pos"] = calib["gripper"]["min_pos"] + calib["gripper"]["min_pos"] = tmp_max_pos + + arm.write("Goal_Position", calib["gripper"]["min_pos"], "gripper") + time.sleep(1) + arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1350, "wrist_flex") + time.sleep(1) + + # Calibrate shoulder_lift + + def in_between_move_hook(): + nonlocal arm, calib + initial_pos = arm.read("Present_Position", "shoulder_lift") + arm.write("Goal_Position", initial_pos + 900, "shoulder_lift") + time.sleep(1) + arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] - 150, "elbow_flex") + time.sleep(1) + + calib["shoulder_lift"] = move_to_calibrate( + arm, "shoulder_lift", positive_first=False, in_between_move_hook=in_between_move_hook + ) + # add an 30 steps as offset to align with body + calib["shoulder_lift"] = apply_offset(calib["shoulder_lift"], offset=30 + 1024 - 120) + + # Calibrate wrist_roll + + def while_move_hook(): + nonlocal arm, calib + positions = { + "shoulder_lift": round(calib["shoulder_lift"]["zero_pos"] - 1600), + "elbow_flex": round(calib["elbow_flex"]["zero_pos"] + 1700), + "wrist_flex": round(calib["wrist_flex"]["zero_pos"] + 800), + "gripper": round(calib["gripper"]["max_pos"] - 400), + } + arm.write("Goal_Position", list(positions.values()), list(positions.keys())) + + arm.write("Goal_Position", round(calib["shoulder_lift"]["zero_pos"] - 1600), "shoulder_lift") + time.sleep(2) + arm.write("Goal_Position", round(calib["elbow_flex"]["zero_pos"] + 1700), "elbow_flex") + time.sleep(2) + arm.write("Goal_Position", round(calib["wrist_flex"]["zero_pos"] + 800), "wrist_flex") + time.sleep(2) + arm.write("Goal_Position", round(calib["gripper"]["max_pos"] - 400), "gripper") + time.sleep(2) + + calib["wrist_roll"] = move_to_calibrate( + arm, "wrist_roll", positive_first=False, while_move_hook=while_move_hook + ) + + arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"], "wrist_roll") + time.sleep(1) + arm.write("Goal_Position", calib["gripper"]["min_pos"], "gripper") + time.sleep(1) + arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"], "wrist_flex") + time.sleep(1) + arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 2048, "elbow_flex") + arm.write("Goal_Position", calib["shoulder_lift"]["zero_pos"] - 2048, "shoulder_lift") + time.sleep(1) + arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan") + time.sleep(1) + + calib_modes = [] + for name in arm.motor_names: + if name == "gripper": + calib_modes.append(CalibrationMode.LINEAR.name) + else: + calib_modes.append(CalibrationMode.DEGREE.name) + + calib_dict = { + "homing_offset": [calib[name]["homing_offset"] for name in arm.motor_names], + "drive_mode": [calib[name]["drive_mode"] for name in arm.motor_names], + "start_pos": [calib[name]["min_pos"] for name in arm.motor_names], + "end_pos": [calib[name]["max_pos"] for name in arm.motor_names], + "calib_mode": calib_modes, + "motor_names": arm.motor_names, + } + + return calib_dict + + +def run_arm_manual_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str): """This function ensures that a neural network trained on data collected on a given robot can work on another robot. For instance before calibration, setting a same goal position for each motor of two different robots will get two very different positions. But after calibration, @@ -80,8 +302,7 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type # Compute homing offset so that `present_position + homing_offset ~= target_position`. zero_pos = arm.read("Present_Position") - zero_nearest_pos = compute_nearest_rounded_position(zero_pos, arm.motor_models) - homing_offset = zero_target_pos - zero_nearest_pos + homing_offset = zero_target_pos - zero_pos # The rotated target position corresponds to a rotation of a quarter turn from the zero position. # This allows to identify the rotation direction of each motor. @@ -103,8 +324,7 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type # Re-compute homing offset to take into account drive mode rotated_drived_pos = apply_drive_mode(rotated_pos, drive_mode) - rotated_nearest_pos = compute_nearest_rounded_position(rotated_drived_pos, arm.motor_models) - homing_offset = rotated_target_pos - rotated_nearest_pos + homing_offset = rotated_target_pos - rotated_drived_pos print("\nMove arm to rest position") print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest")) @@ -112,20 +332,19 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type print() # Joints with rotational motions are expressed in degrees in nominal range of [-180, 180] - calib_mode = [CalibrationMode.DEGREE.name] * len(arm.motor_names) - - # TODO(rcadene): make type of joints (DEGREE or LINEAR) configurable from yaml? - if robot_type in ["so100", "moss"] and "gripper" in arm.motor_names: - # Joints with linear motions (like gripper of Aloha) are experessed in nominal range of [0, 100] - calib_idx = arm.motor_names.index("gripper") - calib_mode[calib_idx] = CalibrationMode.LINEAR.name + calib_modes = [] + for name in arm.motor_names: + if name == "gripper": + calib_modes.append(CalibrationMode.LINEAR.name) + else: + calib_modes.append(CalibrationMode.DEGREE.name) calib_data = { "homing_offset": homing_offset.tolist(), "drive_mode": drive_mode.tolist(), "start_pos": zero_pos.tolist(), "end_pos": rotated_pos.tolist(), - "calib_mode": calib_mode, + "calib_mode": calib_modes, "motor_names": arm.motor_names, } return calib_data diff --git a/lerobot/common/robot_devices/robots/manipulator.py b/lerobot/common/robot_devices/robots/manipulator.py index a41399b5..54c60bac 100644 --- a/lerobot/common/robot_devices/robots/manipulator.py +++ b/lerobot/common/robot_devices/robots/manipulator.py @@ -332,10 +332,22 @@ class ManipulatorRobot: if self.robot_type in ["koch", "aloha"]: from lerobot.common.robot_devices.robots.dynamixel_calibration import run_arm_calibration - elif self.robot_type in ["so100", "moss"]: - from lerobot.common.robot_devices.robots.feetech_calibration import run_arm_calibration - calibration = run_arm_calibration(arm, self.robot_type, name, arm_type) + calibration = run_arm_calibration(arm, self.robot_type, name, arm_type) + + elif self.robot_type in ["so100", "moss"]: + from lerobot.common.robot_devices.robots.feetech_calibration import ( + run_arm_auto_calibration, + run_arm_manual_calibration, + ) + + if arm_type == "leader": + calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type) + + elif arm_type == "follower": + calibration = run_arm_auto_calibration(arm, self.robot_type, name, arm_type) + else: + raise ValueError(arm_type) print(f"Calibration is done! Saving calibration file '{arm_calib_path}'") arm_calib_path.parent.mkdir(parents=True, exist_ok=True) @@ -454,9 +466,11 @@ class ManipulatorRobot: # Close the write lock so that Maximum_Acceleration gets written to EPROM address, # which is mandatory for Maximum_Acceleration to take effect after rebooting. self.follower_arms[name].write("Lock", 0) - # Set Maximum_Acceleration to 250 to speedup acceleration and deceleration of + # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of # the motors. Note: this configuration is not in the official STS3215 Memory Table - self.follower_arms[name].write("Maximum_Acceleration", 250) + self.follower_arms[name].write("Maximum_Acceleration", 254) + self.follower_arms[name].write("Acceleration", 254) + time.sleep(1) def teleop_step( self, record_data=False diff --git a/lerobot/scripts/configure_motor.py b/lerobot/scripts/configure_motor.py index ba9b3b8d..4459a75f 100644 --- a/lerobot/scripts/configure_motor.py +++ b/lerobot/scripts/configure_motor.py @@ -18,13 +18,13 @@ import time def configure_motor(port, brand, model, motor_idx_des, baudrate_des): if brand == "feetech": - from lerobot.common.robot_devices.motors.feetech import MODEL_BAUDRATE_TABLE, NUM_WRITE_RETRY + from lerobot.common.robot_devices.motors.feetech import MODEL_BAUDRATE_TABLE from lerobot.common.robot_devices.motors.feetech import ( SCS_SERIES_BAUDRATE_TABLE as SERIES_BAUDRATE_TABLE, ) from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus as MotorsBusClass elif brand == "dynamixel": - from lerobot.common.robot_devices.motors.dynamixel import MODEL_BAUDRATE_TABLE, NUM_WRITE_RETRY + from lerobot.common.robot_devices.motors.dynamixel import MODEL_BAUDRATE_TABLE from lerobot.common.robot_devices.motors.dynamixel import ( X_SERIES_BAUDRATE_TABLE as SERIES_BAUDRATE_TABLE, ) @@ -82,40 +82,39 @@ def configure_motor(port, brand, model, motor_idx_des, baudrate_des): print(f"Motor index found at: {motor_index}") + if brand == "feetech": + # Allows ID and BAUDRATE to be written in memory + motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Lock", 0) + if baudrate != baudrate_des: print(f"Setting its baudrate to {baudrate_des}") baudrate_idx = list(SERIES_BAUDRATE_TABLE.values()).index(baudrate_des) # The write can fail, so we allow retries - for _ in range(NUM_WRITE_RETRY): - motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Baud_Rate", baudrate_idx) - time.sleep(0.5) - motor_bus.set_bus_baudrate(baudrate_des) - try: - present_baudrate_idx = motor_bus.read_with_motor_ids( - motor_bus.motor_models, motor_index, "Baud_Rate" - ) - except ConnectionError: - print("Failed to write baudrate. Retrying.") - motor_bus.set_bus_baudrate(baudrate) - continue - break - else: - raise OSError("Failed to write baudrate.") + motor_bus.write_with_motor_ids( + motor_bus.motor_models, motor_index, "Baud_Rate", baudrate_idx, num_retry=2 + ) + time.sleep(0.5) + motor_bus.set_bus_baudrate(baudrate_des) + present_baudrate_idx = motor_bus.read_with_motor_ids( + motor_bus.motor_models, motor_index, "Baud_Rate", num_retry=2 + ) if present_baudrate_idx != baudrate_idx: raise OSError("Failed to write baudrate.") - motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Lock", 0) - print(f"Setting its index to desired index {motor_idx_des}") motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "ID", motor_idx_des) - present_idx = motor_bus.read_with_motor_ids(motor_bus.motor_models, motor_idx_des, "ID") + present_idx = motor_bus.read_with_motor_ids(motor_bus.motor_models, motor_idx_des, "ID", num_retry=2) if present_idx != motor_idx_des: raise OSError("Failed to write index.") if brand == "feetech": + # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of + # the motors. Note: this configuration is not in the official STS3215 Memory Table + motor_bus.write("Maximum_Acceleration", 254) + motor_bus.write("Goal_Position", 2047) time.sleep(4) print("Present Position", motor_bus.read("Present_Position"))