forked from tangger/lerobot
WIP to remove
This commit is contained in:
@@ -82,6 +82,7 @@ SCS_SERIES_CONTROL_TABLE = {
|
|||||||
"Goal_Position": (42, 2),
|
"Goal_Position": (42, 2),
|
||||||
"Goal_Time": (44, 2),
|
"Goal_Time": (44, 2),
|
||||||
"Goal_Speed": (46, 2),
|
"Goal_Speed": (46, 2),
|
||||||
|
"Torque_Limit": (48, 2),
|
||||||
"Lock": (55, 1),
|
"Lock": (55, 1),
|
||||||
"Present_Position": (56, 2),
|
"Present_Position": (56, 2),
|
||||||
"Present_Speed": (58, 2),
|
"Present_Speed": (58, 2),
|
||||||
@@ -296,6 +297,18 @@ class FeetechMotorsBus:
|
|||||||
self.group_writers = {}
|
self.group_writers = {}
|
||||||
self.logs = {}
|
self.logs = {}
|
||||||
|
|
||||||
|
self.track_positions = {}
|
||||||
|
self.present_pos = {
|
||||||
|
"prev": [None] * len(self.motor_names),
|
||||||
|
"below_zero": [None] * len(self.motor_names),
|
||||||
|
"above_max": [None] * len(self.motor_names),
|
||||||
|
}
|
||||||
|
self.goal_pos = {
|
||||||
|
"prev": [None] * len(self.motor_names),
|
||||||
|
"below_zero": [None] * len(self.motor_names),
|
||||||
|
"above_max": [None] * len(self.motor_names),
|
||||||
|
}
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
if self.is_connected:
|
if self.is_connected:
|
||||||
raise RobotDeviceAlreadyConnectedError(
|
raise RobotDeviceAlreadyConnectedError(
|
||||||
@@ -378,7 +391,8 @@ class FeetechMotorsBus:
|
|||||||
return [idx for idx, _ in self.motors.values()]
|
return [idx for idx, _ in self.motors.values()]
|
||||||
|
|
||||||
def set_calibration(self, calibration: dict[str, list]):
|
def set_calibration(self, calibration: dict[str, list]):
|
||||||
self.calibration = calibration
|
pass
|
||||||
|
#self.calibration = calibration
|
||||||
|
|
||||||
def apply_calibration_autocorrect(self, values: np.ndarray | list, motor_names: list[str] | None):
|
def apply_calibration_autocorrect(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||||
"""This function apply the calibration, automatically detects out of range errors for motors values and attempt to correct.
|
"""This function apply the calibration, automatically detects out of range errors for motors values and attempt to correct.
|
||||||
@@ -603,6 +617,44 @@ class FeetechMotorsBus:
|
|||||||
values = np.round(values).astype(np.int32)
|
values = np.round(values).astype(np.int32)
|
||||||
return values
|
return values
|
||||||
|
|
||||||
|
def avoid_rotation_reset(self, values, motor_names, data_name):
|
||||||
|
if data_name not in self.track_positions:
|
||||||
|
self.track_positions[data_name] = {
|
||||||
|
"prev": [None] * len(self.motor_names),
|
||||||
|
# Assume False at initialization
|
||||||
|
"below_zero": [False] * len(self.motor_names),
|
||||||
|
"above_max": [False] * len(self.motor_names),
|
||||||
|
}
|
||||||
|
|
||||||
|
track = self.track_positions[data_name]
|
||||||
|
|
||||||
|
if motor_names is None:
|
||||||
|
motor_names = self.motor_names
|
||||||
|
|
||||||
|
for i, name in enumerate(motor_names):
|
||||||
|
idx = self.motor_names.index(name)
|
||||||
|
|
||||||
|
if track["prev"][idx] is None:
|
||||||
|
track["prev"][idx] = values[i]
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Detect a full rotation occured
|
||||||
|
if abs(track["prev"][idx] - values[i]) > 2048:
|
||||||
|
|
||||||
|
# Position went below 0 and got reset to 4095
|
||||||
|
if track["prev"][idx] < values[i]:
|
||||||
|
# So we set negative value by adding a full rotation
|
||||||
|
values[i] -= 4096
|
||||||
|
|
||||||
|
# Position went above 4095 and got reset to 0
|
||||||
|
elif track["prev"][idx] > values[i]:
|
||||||
|
# So we add a full rotation
|
||||||
|
values[i] += 4096
|
||||||
|
|
||||||
|
track["prev"][idx] = values[i]
|
||||||
|
|
||||||
|
return values
|
||||||
|
|
||||||
def read_with_motor_ids(self, motor_models, motor_ids, data_name):
|
def read_with_motor_ids(self, motor_models, motor_ids, data_name):
|
||||||
return_list = True
|
return_list = True
|
||||||
if not isinstance(motor_ids, list):
|
if not isinstance(motor_ids, list):
|
||||||
@@ -615,7 +667,11 @@ class FeetechMotorsBus:
|
|||||||
for idx in motor_ids:
|
for idx in motor_ids:
|
||||||
group.addParam(idx)
|
group.addParam(idx)
|
||||||
|
|
||||||
comm = group.txRxPacket()
|
for _ in range(NUM_READ_RETRY):
|
||||||
|
comm = group.txRxPacket()
|
||||||
|
if comm == COMM_SUCCESS:
|
||||||
|
break
|
||||||
|
|
||||||
if comm != COMM_SUCCESS:
|
if comm != COMM_SUCCESS:
|
||||||
raise ConnectionError(
|
raise ConnectionError(
|
||||||
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
|
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
|
||||||
@@ -685,6 +741,11 @@ class FeetechMotorsBus:
|
|||||||
if data_name in CONVERT_UINT32_TO_INT32_REQUIRED:
|
if data_name in CONVERT_UINT32_TO_INT32_REQUIRED:
|
||||||
values = values.astype(np.int32)
|
values = values.astype(np.int32)
|
||||||
|
|
||||||
|
print(values)
|
||||||
|
|
||||||
|
if data_name in CALIBRATION_REQUIRED:
|
||||||
|
values = self.avoid_rotation_reset(values, motor_names, data_name)
|
||||||
|
|
||||||
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
|
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
|
||||||
values = self.apply_calibration_autocorrect(values, motor_names)
|
values = self.apply_calibration_autocorrect(values, motor_names)
|
||||||
|
|
||||||
@@ -737,6 +798,8 @@ class FeetechMotorsBus:
|
|||||||
|
|
||||||
values = np.array(values)
|
values = np.array(values)
|
||||||
|
|
||||||
|
print(values)
|
||||||
|
|
||||||
motor_ids = []
|
motor_ids = []
|
||||||
models = []
|
models = []
|
||||||
for name in motor_names:
|
for name in motor_names:
|
||||||
|
|||||||
@@ -49,9 +49,10 @@ def apply_drive_mode(position, drive_mode):
|
|||||||
|
|
||||||
|
|
||||||
def compute_nearest_rounded_position(position, models):
|
def compute_nearest_rounded_position(position, models):
|
||||||
delta_turn = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, models)
|
# delta_turn = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, models)
|
||||||
nearest_pos = np.round(position.astype(float) / delta_turn) * delta_turn
|
# nearest_pos = np.round(position.astype(float) / delta_turn) * delta_turn
|
||||||
return nearest_pos.astype(position.dtype)
|
# return nearest_pos.astype(position.dtype)
|
||||||
|
return position
|
||||||
|
|
||||||
|
|
||||||
def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
||||||
@@ -382,6 +383,8 @@ class ManipulatorRobot:
|
|||||||
self.set_koch_robot_preset()
|
self.set_koch_robot_preset()
|
||||||
elif self.robot_type == "aloha":
|
elif self.robot_type == "aloha":
|
||||||
self.set_aloha_robot_preset()
|
self.set_aloha_robot_preset()
|
||||||
|
elif self.robot_type == "so_100":
|
||||||
|
self.set_so_100_robot_preset()
|
||||||
else:
|
else:
|
||||||
warnings.warn(f"No preset found for robot type: {self.robot_type}", stacklevel=1)
|
warnings.warn(f"No preset found for robot type: {self.robot_type}", stacklevel=1)
|
||||||
|
|
||||||
@@ -397,6 +400,13 @@ class ManipulatorRobot:
|
|||||||
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
|
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
|
||||||
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper")
|
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper")
|
||||||
|
|
||||||
|
# Check both arms can be read
|
||||||
|
for name in self.follower_arms:
|
||||||
|
self.follower_arms[name].read("Present_Position")
|
||||||
|
for name in self.leader_arms:
|
||||||
|
self.leader_arms[name].read("Present_Position")
|
||||||
|
|
||||||
|
|
||||||
# Connect the cameras
|
# Connect the cameras
|
||||||
for name in self.cameras:
|
for name in self.cameras:
|
||||||
self.cameras[name].connect()
|
self.cameras[name].connect()
|
||||||
@@ -523,6 +533,23 @@ class ManipulatorRobot:
|
|||||||
stacklevel=1,
|
stacklevel=1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def set_so_100_robot_preset(self):
|
||||||
|
for name in self.follower_arms:
|
||||||
|
self.follower_arms[name].write("Mode", 0)
|
||||||
|
#self.follower_arms[name].write("P_Coefficient", 255, "shoulder_pan")
|
||||||
|
self.follower_arms[name].write("P_Coefficient", 32, "shoulder_pan")
|
||||||
|
#self.follower_arms[name].write("D_Coefficient", 230, "shoulder_pan")
|
||||||
|
self.follower_arms[name].write("D_Coefficient", 32, "shoulder_pan")
|
||||||
|
self.follower_arms[name].write("Acceleration", 254)
|
||||||
|
self.follower_arms[name].write("Minimum_Startup_Force", 16)
|
||||||
|
|
||||||
|
# for name in self.leader_arms:
|
||||||
|
# self.leader_arms[name].write("Max_Torque_Limit", 50, "gripper")
|
||||||
|
# self.leader_arms[name].write("Torque_Limit", 1000, "gripper")
|
||||||
|
# self.leader_arms[name].write("Torque_Enable", 1, "gripper")
|
||||||
|
# self.leader_arms[name].write("Goal_Position", 2048, "gripper")
|
||||||
|
|
||||||
|
|
||||||
def teleop_step(
|
def teleop_step(
|
||||||
self, record_data=False
|
self, record_data=False
|
||||||
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
|
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
|
||||||
|
|||||||
@@ -5,12 +5,12 @@ calibration_dir: .cache/calibration/so_100
|
|||||||
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
|
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
|
||||||
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
|
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
|
||||||
# the number of motors in your follower arms.
|
# the number of motors in your follower arms.
|
||||||
max_relative_target: 5
|
max_relative_target: null
|
||||||
|
|
||||||
leader_arms:
|
leader_arms:
|
||||||
main:
|
main:
|
||||||
_target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
|
_target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
|
||||||
port: /dev/tty.usbmodem585A0080521
|
port: /dev/tty.usbmodem585A0077581
|
||||||
motors:
|
motors:
|
||||||
# name: (index, model)
|
# name: (index, model)
|
||||||
shoulder_pan: [1, "sts3215"]
|
shoulder_pan: [1, "sts3215"]
|
||||||
@@ -23,7 +23,7 @@ leader_arms:
|
|||||||
follower_arms:
|
follower_arms:
|
||||||
main:
|
main:
|
||||||
_target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
|
_target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
|
||||||
port: /dev/tty.usbmodem585A0080971
|
port: /dev/tty.usbmodem585A0080521
|
||||||
motors:
|
motors:
|
||||||
# name: (index, model)
|
# name: (index, model)
|
||||||
shoulder_pan: [1, "sts3215"]
|
shoulder_pan: [1, "sts3215"]
|
||||||
@@ -50,4 +50,4 @@ cameras:
|
|||||||
# ~ Koch specific settings ~
|
# ~ Koch specific settings ~
|
||||||
# Sets the leader arm in torque mode with the gripper motor set to this angle. This makes it possible
|
# Sets the leader arm in torque mode with the gripper motor set to this angle. This makes it possible
|
||||||
# to squeeze the gripper and have it spring back to an open position on its own.
|
# to squeeze the gripper and have it spring back to an open position on its own.
|
||||||
gripper_open_degree: 35.156
|
# gripper_open_degree: 35.156
|
||||||
|
|||||||
@@ -1,35 +1,33 @@
|
|||||||
|
"""
|
||||||
|
This script configure a single motor at a time to a given ID and baudrate.
|
||||||
|
|
||||||
|
Example of usage:
|
||||||
|
```bash
|
||||||
|
python lerobot/scripts/configure_motor.py \
|
||||||
|
--port /dev/tty.usbmodem585A0080521 \
|
||||||
|
--brand feetech \
|
||||||
|
--model sts3215 \
|
||||||
|
--baudrate 1000000 \
|
||||||
|
--ID 1
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import importlib
|
import importlib
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
def configure_motor(brand, model, motor_idx_des, baudrate_des):
|
def configure_motor(port, brand, model, motor_idx_des, baudrate_des):
|
||||||
if brand == "feetech":
|
if brand == "feetech":
|
||||||
motor_bus_class = importlib.import_module(
|
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus as motor_bus_class
|
||||||
"lerobot.common.robot_devices.motors.feetech"
|
from lerobot.common.robot_devices.motors.feetech import SCS_SERIES_BAUDRATE_TABLE as baudrate_table
|
||||||
).FeetechMotorsBus
|
from lerobot.common.robot_devices.motors.feetech import NUM_WRITE_RETRY as num_write_retry
|
||||||
baudrate_table = importlib.import_module(
|
from lerobot.common.robot_devices.motors.feetech import MODEL_BAUDRATE_TABLE as model_baud_rate_table
|
||||||
"lerobot.common.robot_devices.motors.feetech"
|
|
||||||
).SCS_SERIES_BAUDRATE_TABLE
|
|
||||||
num_write_retry = importlib.import_module(
|
|
||||||
"lerobot.common.robot_devices.motors.feetech"
|
|
||||||
).NUM_WRITE_RETRY
|
|
||||||
model_baud_rate_table = importlib.import_module(
|
|
||||||
"lerobot.common.robot_devices.motors.feetech"
|
|
||||||
).MODEL_BAUDRATE_TABLE
|
|
||||||
elif brand == "dynamixel":
|
elif brand == "dynamixel":
|
||||||
motor_bus_class = importlib.import_module(
|
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus as motor_bus_class
|
||||||
"lerobot.common.robot_devices.motors.dynamixel"
|
from lerobot.common.robot_devices.motors.dynamixel import X_SERIES_BAUDRATE_TABLE as baudrate_table
|
||||||
).DynamixelMotorsBus
|
from lerobot.common.robot_devices.motors.dynamixel import NUM_WRITE_RETRY as num_write_retry
|
||||||
baudrate_table = importlib.import_module(
|
from lerobot.common.robot_devices.motors.dynamixel import MODEL_BAUDRATE_TABLE as model_baud_rate_table
|
||||||
"lerobot.common.robot_devices.motors.dynamixel"
|
|
||||||
).X_SERIES_BAUDRATE_TABLE
|
|
||||||
num_write_retry = importlib.import_module(
|
|
||||||
"lerobot.common.robot_devices.motors.dynamixel"
|
|
||||||
).NUM_WRITE_RETRY
|
|
||||||
model_baud_rate_table = importlib.import_module(
|
|
||||||
"lerobot.common.robot_devices.motors.dynamixel"
|
|
||||||
).MODEL_BAUDRATE_TABLE
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Currently we do not support this motor brand: {brand}. We currently support feetech and dynamixel motors."
|
f"Currently we do not support this motor brand: {brand}. We currently support feetech and dynamixel motors."
|
||||||
@@ -48,7 +46,7 @@ def configure_motor(brand, model, motor_idx_des, baudrate_des):
|
|||||||
|
|
||||||
# Initialize the MotorBus with the correct port and motor configurations
|
# Initialize the MotorBus with the correct port and motor configurations
|
||||||
motor_bus = motor_bus_class(
|
motor_bus = motor_bus_class(
|
||||||
port="/dev/ttyACM0", motors={motor_name: (motor_index_arbitrary, motor_model)}
|
port=port, motors={motor_name: (motor_index_arbitrary, motor_model)}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Try to connect to the motor bus and handle any connection-specific errors
|
# Try to connect to the motor bus and handle any connection-specific errors
|
||||||
@@ -67,7 +65,7 @@ def configure_motor(brand, model, motor_idx_des, baudrate_des):
|
|||||||
|
|
||||||
for baudrate in all_baudrates:
|
for baudrate in all_baudrates:
|
||||||
motor_bus.set_bus_baudrate(baudrate)
|
motor_bus.set_bus_baudrate(baudrate)
|
||||||
present_ids = motor_bus.find_motor_indices()
|
present_ids = motor_bus.find_motor_indices(list(range(1,10)))
|
||||||
if len(present_ids) > 1:
|
if len(present_ids) > 1:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Error: More than one motor ID detected. This script is designed to only handle one motor at a time. Please disconnect all but one motor."
|
"Error: More than one motor ID detected. This script is designed to only handle one motor at a time. Please disconnect all but one motor."
|
||||||
@@ -109,36 +107,42 @@ def configure_motor(brand, model, motor_idx_des, baudrate_des):
|
|||||||
if present_baudrate_idx != baudrate_idx:
|
if present_baudrate_idx != baudrate_idx:
|
||||||
raise OSError("Failed to write baudrate.")
|
raise OSError("Failed to write baudrate.")
|
||||||
|
|
||||||
|
|
||||||
|
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "Lock", 0)
|
||||||
|
|
||||||
print(f"Setting its index to desired index {motor_idx_des}")
|
print(f"Setting its index to desired index {motor_idx_des}")
|
||||||
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "ID", motor_idx_des)
|
motor_bus.write_with_motor_ids(motor_bus.motor_models, motor_index, "ID", motor_idx_des)
|
||||||
|
|
||||||
present_idx = motor_bus.read_with_motor_ids(motor_bus.motor_models, motor_idx_des, "ID")
|
present_idx = motor_bus.read_with_motor_ids(motor_bus.motor_models, motor_idx_des, "ID")
|
||||||
if present_idx != motor_idx_des:
|
if present_idx != motor_idx_des:
|
||||||
raise OSError("Failed to write index.")
|
raise OSError("Failed to write index.")
|
||||||
|
|
||||||
|
if brand == "feetech":
|
||||||
|
motor_bus.write("Goal_Position", 2047)
|
||||||
|
time.sleep(4)
|
||||||
|
motor_bus.write("Offset", 2047)
|
||||||
|
time.sleep(4)
|
||||||
|
breakpoint()
|
||||||
|
motor_bus.read("Present_Position")
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error occurred during motor configuration: {e}")
|
print(f"Error occurred during motor configuration: {e}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Disconnect the motor bus
|
|
||||||
motor_bus.disconnect()
|
motor_bus.disconnect()
|
||||||
print("Disconnected from motor bus.")
|
print("Disconnected from motor bus.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Set up the argument parser
|
parser = argparse.ArgumentParser()
|
||||||
parser = argparse.ArgumentParser(
|
parser.add_argument("--port", type=str, required=True, help="Motors bus port (e.g. dynamixel,feetech)")
|
||||||
description="This script is used to configure a single motor at a time to the ID and baudrate you desire."
|
parser.add_argument("--brand", type=str, required=True, help="Motor brand (e.g. dynamixel,feetech)")
|
||||||
)
|
parser.add_argument("--model", type=str, required=True, help="Motor model (e.g. xl330-m077,sts3215)")
|
||||||
parser.add_argument("--brand", type=str, required=True, help="Motor brand (e.g., dynamixel, feetech)")
|
parser.add_argument("--ID", type=int, required=True, help="Desired ID of the current motor (e.g. 1,2,3)")
|
||||||
parser.add_argument("--model", type=str, required=True, help="Motor model (e.g., xl330-m077, sts3215)")
|
|
||||||
parser.add_argument("--ID", type=int, required=True, help="Desired ID of the current motor (e.g., 1)")
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--baudrate", type=int, default=1000000, help="Desired baudrate for the motor (default: 1000000)"
|
"--baudrate", type=int, default=1000000, help="Desired baudrate for the motor (default: 1000000)"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Parse arguments
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# Call the configure_motor function with the parsed arguments
|
configure_motor(args.port, args.brand, args.model, args.ID, args.baudrate)
|
||||||
configure_motor(args.brand, args.model, args.ID, args.baudrate)
|
|
||||||
|
|||||||
@@ -445,6 +445,8 @@ def record(
|
|||||||
# Using `with` to exist smoothly if an execption is raised.
|
# Using `with` to exist smoothly if an execption is raised.
|
||||||
futures = []
|
futures = []
|
||||||
num_image_writers = num_image_writers_per_camera * len(robot.cameras)
|
num_image_writers = num_image_writers_per_camera * len(robot.cameras)
|
||||||
|
if num_image_writers == 0:
|
||||||
|
num_image_writers = 1
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=num_image_writers) as executor:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=num_image_writers) as executor:
|
||||||
# Start recording all episodes
|
# Start recording all episodes
|
||||||
while episode_index < num_episodes:
|
while episode_index < num_episodes:
|
||||||
|
|||||||
Reference in New Issue
Block a user