Added a configuration script that can be used for feetech and dynamixel calibration.

This commit is contained in:
jess-moss
2024-09-17 15:44:01 -05:00
parent 0035bb962a
commit 89da9f73b5
3 changed files with 162 additions and 40 deletions

View File

@@ -380,17 +380,6 @@ class DynamixelMotorsBus:
# Set expected baudrate for the bus
self.set_bus_baudrate(BAUDRATE)
if not self.are_motors_configured():
input(
"\n/!\\ A configuration issue has been detected with your motors: \n"
"If it's the first time that you use these motors, press enter to configure your motors... but before "
"verify that all the cables are connected the proper way. If you find an issue, before making a modification, "
"kill the python process, unplug the power cord to not damage the motors, rewire correctly, then plug the power "
"again and relaunch the script.\n"
)
print()
self.configure_motors()
def reconnect(self):
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
@@ -441,7 +430,7 @@ class DynamixelMotorsBus:
if i > 0:
try:
self._read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
except ConnectionError:
print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.")
input("Press Enter to continue...")
@@ -463,13 +452,13 @@ class DynamixelMotorsBus:
# The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY):
self._write_with_motor_ids(
self.write_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate", baudrate_idx
)
time.sleep(0.5)
self.set_bus_baudrate(BAUDRATE)
try:
present_baudrate_idx = self._read_with_motor_ids(
present_baudrate_idx = self.read_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate"
)
except ConnectionError:
@@ -484,9 +473,9 @@ class DynamixelMotorsBus:
raise OSError("Failed to write baudrate.")
print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})")
self._write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])
self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])
present_idx = self._read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
if present_idx != untaken_ids[i]:
raise OSError("Failed to write index.")
@@ -503,7 +492,7 @@ class DynamixelMotorsBus:
print(f"Setting expected motor indices: {self.motor_indices}")
self.set_bus_baudrate(BAUDRATE)
self._write_with_motor_ids(
self.write_with_motor_ids(
self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices
)
print()
@@ -520,7 +509,7 @@ class DynamixelMotorsBus:
indices = []
for idx in tqdm.tqdm(possible_ids):
try:
present_idx = self._read_with_motor_ids(self.motor_models, [idx], "ID")[0]
present_idx = self.read_with_motor_ids(self.motor_models, [idx], "ID")[0]
except ConnectionError:
continue
@@ -780,7 +769,7 @@ class DynamixelMotorsBus:
values = np.round(values).astype(np.int32)
return values
def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
def read_with_motor_ids(self, motor_models, motor_ids, data_name):
return_list = True
if not isinstance(motor_ids, list):
return_list = False
@@ -875,7 +864,7 @@ class DynamixelMotorsBus:
return values
def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):

View File

@@ -353,17 +353,6 @@ class FeetechMotorsBus:
# Set expected baudrate for the bus
self.set_bus_baudrate(BAUDRATE)
if not self.are_motors_configured():
input(
"\n/!\\ A configuration issue has been detected with your motors: \n"
"If it's the first time that you use these motors, press enter to configure your motors... but before "
"verify that all the cables are connected the proper way. If you find an issue, before making a modification, "
"kill the python process, unplug the power cord to not damage the motors, rewire correctly, then plug the power "
"again and relaunch the script.\n"
)
print()
self.configure_motors()
def reconnect(self):
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
@@ -414,7 +403,7 @@ class FeetechMotorsBus:
if i > 0:
try:
self._read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
except ConnectionError:
print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.")
input("Press Enter to continue...")
@@ -436,13 +425,13 @@ class FeetechMotorsBus:
# The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY):
self._write_with_motor_ids(
self.write_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate", baudrate_idx
)
time.sleep(0.5)
self.set_bus_baudrate(BAUDRATE)
try:
present_baudrate_idx = self._read_with_motor_ids(
present_baudrate_idx = self.read_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate"
)
except ConnectionError:
@@ -457,9 +446,9 @@ class FeetechMotorsBus:
raise OSError("Failed to write baudrate.")
print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})")
self._write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])
self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])
present_idx = self._read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
if present_idx != untaken_ids[i]:
raise OSError("Failed to write index.")
@@ -476,7 +465,7 @@ class FeetechMotorsBus:
print(f"Setting expected motor indices: {self.motor_indices}")
self.set_bus_baudrate(BAUDRATE)
self._write_with_motor_ids(
self.write_with_motor_ids(
self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices
)
print()
@@ -493,7 +482,7 @@ class FeetechMotorsBus:
indices = []
for idx in tqdm.tqdm(possible_ids):
try:
present_idx = self._read_with_motor_ids(self.motor_models, [idx], "ID")[0]
present_idx = self.read_with_motor_ids(self.motor_models, [idx], "ID")[0]
except ConnectionError:
continue
@@ -753,7 +742,7 @@ class FeetechMotorsBus:
values = np.round(values).astype(np.int32)
return values
def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
def read_with_motor_ids(self, motor_models, motor_ids, data_name):
return_list = True
if not isinstance(motor_ids, list):
return_list = False
@@ -848,7 +837,7 @@ class FeetechMotorsBus:
return values
def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):

View File

@@ -0,0 +1,144 @@
import argparse
import importlib
import time
def configure_motor(brand, model, motor_idx_des, baudrate_des):
if brand == "feetech":
motor_bus_class = importlib.import_module(
"lerobot.common.robot_devices.motors.feetech"
).FeetechMotorsBus
baudrate_table = importlib.import_module(
"lerobot.common.robot_devices.motors.feetech"
).SCS_SERIES_BAUDRATE_TABLE
num_write_retry = importlib.import_module(
"lerobot.common.robot_devices.motors.feetech"
).NUM_WRITE_RETRY
model_baud_rate_table = importlib.import_module(
"lerobot.common.robot_devices.motors.feetech"
).MODEL_BAUDRATE_TABLE
elif brand == "dynamixel":
motor_bus_class = importlib.import_module(
"lerobot.common.robot_devices.motors.dynamixel"
).DynamixelMotorsBus
baudrate_table = importlib.import_module(
"lerobot.common.robot_devices.motors.dynamixel"
).X_SERIES_BAUDRATE_TABLE
num_write_retry = importlib.import_module(
"lerobot.common.robot_devices.motors.dynamixel"
).NUM_WRITE_RETRY
model_baud_rate_table = importlib.import_module(
"lerobot.common.robot_devices.motors.dynamixel"
).MODEL_BAUDRATE_TABLE
else:
raise ValueError(
f"Currently we do not support this motor brand: {brand}. We currently support feetech and dynamixel motors."
)
# Check if the provided model exists in the model_baud_rate_table
if model not in model_baud_rate_table:
raise ValueError(
f"Invalid model '{model}' for brand '{brand}'. Supported models: {list(model_baud_rate_table.keys())}"
)
# Setup motor names, indices, and models
motor_name = "motor"
motor_index_arbitrary = motor_idx_des # Use the motor ID passed via argument
motor_model = model # Use the motor model passed via argument
# Initialize the MotorBus with the correct port and motor configurations
motor_bus = motor_bus_class(
port="/dev/ttyACM0", motors={motor_name: (motor_index_arbitrary, motor_model)}
)
# Try to connect to the motor bus and handle any connection-specific errors
try:
motor_bus.connect()
print(f"Connected on port {motor_bus.port}")
except Exception as e:
print(f"Error occurred when connecting to the motor bus: {e}")
return
# Motor bus is connected, proceed with the rest of the operations
try:
print("Scanning all baudrates and motor indices")
all_baudrates = set(baudrate_table.values())
motor_index = -1 # Set the motor index to an out-of-range value.
for baudrate in all_baudrates:
motor_bus.set_bus_baudrate(baudrate)
present_ids = motor_bus.find_motor_indices()
if len(present_ids) > 1:
raise ValueError(
"Error: More than one motor ID detected. This script is designed to only handle one motor at a time. Please disconnect all but one motor."
)
if len(present_ids) == 1:
if motor_index != -1:
raise ValueError(
"Error: More than one motor ID detected. This script is designed to only handle one motor at a time. Please disconnect all but one motor."
)
motor_index = present_ids[0]
if motor_index == -1:
raise ValueError("No motors detected. Please ensure you have one motor connected.")
print(f"Motor index found at: {motor_index}")
if baudrate != baudrate_des:
print(f"Setting its baudrate to {baudrate_des}")
baudrate_idx = list(baudrate_table.values()).index(baudrate_des)
# The write can fail, so we allow retries
for _ in range(num_write_retry):
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Baud_Rate", baudrate_idx)
time.sleep(0.5)
motor_bus.set_bus_baudrate(baudrate_des)
try:
present_baudrate_idx = motor_bus.read_with_motor_ids(
motor_bus.motor_models, motor_index, "Baud_Rate"
)
except ConnectionError:
print("Failed to write baudrate. Retrying.")
motor_bus.set_bus_baudrate(baudrate)
continue
break
else:
raise OSError("Failed to write baudrate.")
if present_baudrate_idx != baudrate_idx:
raise OSError("Failed to write baudrate.")
print(f"Setting its index to desired index {motor_idx_des}")
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "ID", motor_idx_des)
present_idx = motor_bus.read_with_motor_ids(motor_bus.motor_models, motor_idx_des, "ID")
if present_idx != motor_idx_des:
raise OSError("Failed to write index.")
except Exception as e:
print(f"Error occurred during motor configuration: {e}")
finally:
# Disconnect the motor bus
motor_bus.disconnect()
print("Disconnected from motor bus.")
if __name__ == "__main__":
# Set up the argument parser
parser = argparse.ArgumentParser(
description="This script is used to configure a single motor at a time to the ID and baudrate you desire."
)
parser.add_argument("--brand", type=str, required=True, help="Motor brand (e.g., dynamixel, feetech)")
parser.add_argument("--model", type=str, required=True, help="Motor model (e.g., xl330-m077, sts3215)")
parser.add_argument("--ID", type=int, required=True, help="Desired ID of the current motor (e.g., 1)")
parser.add_argument(
"--baudrate", type=int, default=1000000, help="Desired baudrate for the motor (default: 1000000)"
)
# Parse arguments
args = parser.parse_args()
# Call the configure_motor function with the parsed arguments
configure_motor(args.brand, args.model, args.ID, args.baudrate)