import logging import pickle import time from dataclasses import dataclass, field, replace from pathlib import Path from typing import Sequence import numpy as np import torch from lerobot.common.robot_devices.cameras.utils import Camera from lerobot.common.robot_devices.motors.dynamixel import ( OperatingMode, TorqueMode, convert_degrees_to_steps, ) from lerobot.common.robot_devices.motors.utils import MotorsBus from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError ######################################################################## # Calibration logic ######################################################################## URL_TEMPLATE = ( "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp" ) # In nominal degree range ]-180, +180[ ZERO_POSITION_DEGREE = 0 ROTATED_POSITION_DEGREE = 90 def assert_drive_mode(drive_mode): # `drive_mode` is in [0,1] with 0 means original rotation direction for the motor, and 1 means inverted. if not np.all(np.isin(drive_mode, [0, 1])): raise ValueError(f"`drive_mode` contains values other than 0 or 1: ({drive_mode})") def apply_drive_mode(position, drive_mode): assert_drive_mode(drive_mode) # Convert `drive_mode` from [0, 1] with 0 indicates original rotation direction and 1 inverted, # to [-1, 1] with 1 indicates original rotation direction and -1 inverted. signed_drive_mode = -(drive_mode * 2 - 1) position *= signed_drive_mode return position def reset_torque_mode(arm: MotorsBus): # To be configured, all servos must be in "torque disable" mode arm.write("Torque_Enable", TorqueMode.DISABLED.value) # Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't # rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm, # you could end up with a servo with a position 0 or 4095 at a crucial point See [ # https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11] all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"] if len(all_motors_except_gripper) > 0: arm.write("Operating_Mode", OperatingMode.EXTENDED_POSITION.value, all_motors_except_gripper) # Use 'position control current based' for gripper to be limited by the limit of the current. # For the follower gripper, it means it can grasp an object without forcing too much even tho, # it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch). # For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger # to make it move, and it will move back to its original target position when we release the force. arm.write("Operating_Mode", OperatingMode.CURRENT_CONTROLLED_POSITION.value, "gripper") def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str): """This function ensures that a neural network trained on data collected on a given robot can work on another robot. For instance before calibration, setting a same goal position for each motor of two different robots will get two very different positions. But after calibration, the two robots will move to the same position.To this end, this function computes the homing offset and the drive mode for each motor of a given robot. Homing offset is used to shift the motor position to a ]-2048, +2048[ nominal range (when the motor uses 2048 steps to complete a half a turn). This range is set around an arbitrary "zero position" corresponding to all motor positions being 0. During the calibration process, you will need to manually move the robot to this "zero position". Drive mode is used to invert the rotation direction of the motor. This is useful when some motors have been assembled in the opposite orientation for some robots. During the calibration process, you will need to manually move the robot to the "rotated position". After calibration, the homing offsets and drive modes are stored in a cache. Example of usage: ```python run_arm_calibration(arm, "left", "follower") ``` """ reset_torque_mode(arm) print(f"\nRunning calibration of {name} {arm_type}...") print("\nMove arm to zero position") print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="zero")) input("Press Enter to continue...") # We arbitrarely choosed our zero target position to be a straight horizontal position with gripper upwards and closed. # It is easy to identify and all motors are in a "quarter turn" position. Once calibration is done, this position will # corresponds to every motor angle being 0. If you set all 0 as Goal Position, the arm will move in this position. zero_position = convert_degrees_to_steps(ZERO_POSITION_DEGREE, arm.motor_models) def _compute_nearest_rounded_position(position, models): # TODO(rcadene): Rework this function since some motors cant physically rotate a quarter turn # (e.g. the gripper of Aloha arms can only rotate ~50 degree) quarter_turn_degree = 90 quarter_turn = convert_degrees_to_steps(quarter_turn_degree, models) nearest_pos = np.round(position.astype(float) / quarter_turn) * quarter_turn return nearest_pos.astype(position.dtype) # Compute homing offset so that `present_position + homing_offset ~= target_position`. position = arm.read("Present_Position") position = _compute_nearest_rounded_position(position, arm.motor_models) homing_offset = zero_position - position print("\nMove arm to rotated target position") print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rotated")) input("Press Enter to continue...") # The rotated target position corresponds to a rotation of a quarter turn from the zero position. # This allows to identify the rotation direction of each motor. # For instance, if the motor rotates 90 degree, and its value is -90 after applying the homing offset, then we know its rotation direction # is inverted. However, for the calibration being successful, we need everyone to follow the same target position. # Sometimes, there is only one possible rotation direction. For instance, if the gripper is closed, there is only one direction which # corresponds to opening the gripper. When the rotation direction is ambiguous, we arbitrarely rotate clockwise from the point of view # of the previous motor in the kinetic chain. rotated_position = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models) # Find drive mode by rotating each motor by a quarter of a turn. # Drive mode indicates if the motor rotation direction should be inverted (=1) or not (=0). position = arm.read("Present_Position") position += homing_offset position = _compute_nearest_rounded_position(position, arm.motor_models) drive_mode = (position != rotated_position).astype(np.int32) # Re-compute homing offset to take into account drive mode position = arm.read("Present_Position") position = apply_drive_mode(position, drive_mode) position = _compute_nearest_rounded_position(position, arm.motor_models) homing_offset = rotated_position - position print("\nMove arm to rest position") print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rest")) input("Press Enter to continue...") print() return homing_offset, drive_mode ######################################################################## # Alexander Koch robot arm ######################################################################## @dataclass class KochRobotConfig: """ Example of usage: ```python KochRobotConfig() ``` """ # Define all components of the robot leader_arms: dict[str, MotorsBus] = field(default_factory=lambda: {}) follower_arms: dict[str, MotorsBus] = field(default_factory=lambda: {}) cameras: dict[str, Camera] = field(default_factory=lambda: {}) # Optionally limit the magnitude of the relative positional target vector for safety purposes. # Set this to a positive scalar to have the same value for all motors, or a list that is the same length # as the number of motors in your follower arms (assumes all follower arms have the same number of # motors). max_relative_target: list[float] | float | None = None # Optionally set the leader arm in torque mode with the gripper motor set to this angle. This makes it # possible to squeeze the gripper and have it spring back to an open position on its own. If None, the # gripper is not put in torque mode. gripper_open_degree: float | None = None def __setattr__(self, prop: str, val): if prop == "max_relative_target" and val is not None and isinstance(val, Sequence): for name in self.follower_arms: if len(self.follower_arms[name].motors) != len(val): raise ValueError( f"len(max_relative_target)={len(val)} but the follower arm with name {name} has " f"{len(self.follower_arms[name].motors)} motors. Please make sure that the " f"`max_relative_target` list has as many parameters as there are motors per arm. " "Note: This feature does not yet work with robots where different follower arms have " "different numbers of motors." ) super().__setattr__(prop, val) class KochRobot: # TODO(rcadene): Implement force feedback """This class allows to control any Koch robot of various number of motors. A few versions are available: - [Koch v1.0](https://github.com/AlexanderKoch-Koch/low_cost_robot), with and without the wrist-to-elbow expansion, which was developed by Alexander Koch from [Tau Robotics](https://tau-robotics.com): [Github for sourcing and assembly]( - [Koch v1.1])https://github.com/jess-moss/koch-v1-1), which was developed by Jess Moss. Example of highest frequency teleoperation without camera: ```python # Defines how to communicate with the motors of the leader and follower arms leader_arms = { "main": DynamixelMotorsBus( port="/dev/tty.usbmodem575E0031751", motors={ # name: (index, model) "shoulder_pan": (1, "xl330-m077"), "shoulder_lift": (2, "xl330-m077"), "elbow_flex": (3, "xl330-m077"), "wrist_flex": (4, "xl330-m077"), "wrist_roll": (5, "xl330-m077"), "gripper": (6, "xl330-m077"), }, ), } follower_arms = { "main": DynamixelMotorsBus( port="/dev/tty.usbmodem575E0032081", motors={ # name: (index, model) "shoulder_pan": (1, "xl430-w250"), "shoulder_lift": (2, "xl430-w250"), "elbow_flex": (3, "xl330-m288"), "wrist_flex": (4, "xl330-m288"), "wrist_roll": (5, "xl330-m288"), "gripper": (6, "xl330-m288"), }, ), } robot = KochRobot( leader_arms=leader_arms, follower_arms=follower_arms, ) # Connect motors buses and cameras if any (Required) robot.connect() while True: robot.teleop_step() ``` Example of highest frequency data collection without camera: ```python # Assumes leader and follower arms have been instantiated already (see first example) robot = KochRobot( leader_arms=leader_arms, follower_arms=follower_arms, ) robot.connect() while True: observation, action = robot.teleop_step(record_data=True) ``` Example of highest frequency data collection with cameras: ```python # Defines how to communicate with 2 cameras connected to the computer. # Here, the webcam of the laptop and the phone (connected in USB to the laptop) # can be reached respectively using the camera indices 0 and 1. These indices can be # arbitrary. See the documentation of `OpenCVCamera` to find your own camera indices. cameras = { "laptop": OpenCVCamera(camera_index=0, fps=30, width=640, height=480), "phone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480), } # Assumes leader and follower arms have been instantiated already (see first example) robot = KochRobot( leader_arms=leader_arms, follower_arms=follower_arms, cameras=cameras, ) robot.connect() while True: observation, action = robot.teleop_step(record_data=True) ``` Example of controlling the robot with a policy (without running multiple policies in parallel to ensure highest frequency): ```python # Assumes leader and follower arms + cameras have been instantiated already (see previous example) robot = KochRobot( leader_arms=leader_arms, follower_arms=follower_arms, cameras=cameras, ) robot.connect() while True: # Uses the follower arms and cameras to capture an observation observation = robot.capture_observation() # Assumes a policy has been instantiated with torch.inference_mode(): action = policy.select_action(observation) # Orders the robot to move robot.send_action(action) ``` Example of disconnecting which is not mandatory since we disconnect when the object is deleted: ```python robot.disconnect() ``` """ def __init__( self, config: KochRobotConfig | None = None, calibration_path: Path = ".cache/calibration/koch.pkl", **kwargs, ): if config is None: config = KochRobotConfig() # Overwrite config arguments using kwargs self.config = replace(config, **kwargs) self.calibration_path = Path(calibration_path) self.leader_arms = self.config.leader_arms self.follower_arms = self.config.follower_arms self.cameras = self.config.cameras self.is_connected = False self.logs = {} def connect(self): if self.is_connected: raise RobotDeviceAlreadyConnectedError( "KochRobot is already connected. Do not run `robot.connect()` twice." ) if not self.leader_arms and not self.follower_arms and not self.cameras: raise ValueError( "KochRobot doesn't have any device to connect. See example of usage in docstring of the class." ) # Connect the arms for name in self.follower_arms: print(f"Connecting {name} follower arm.") self.follower_arms[name].connect() print(f"Connecting {name} leader arm.") self.leader_arms[name].connect() # Reset the arms and load or run calibration if self.calibration_path.exists(): # Reset all arms before setting calibration for name in self.follower_arms: reset_torque_mode(self.follower_arms[name]) for name in self.leader_arms: reset_torque_mode(self.leader_arms[name]) with open(self.calibration_path, "rb") as f: calibration = pickle.load(f) else: print(f"Missing calibration file '{self.calibration_path}'. Starting calibration precedure.") # Run calibration process which begins by reseting all arms calibration = self.run_calibration() print(f"Calibration is done! Saving calibration file '{self.calibration_path}'") self.calibration_path.parent.mkdir(parents=True, exist_ok=True) with open(self.calibration_path, "wb") as f: pickle.dump(calibration, f) # Set calibration for name in self.follower_arms: self.follower_arms[name].set_calibration(calibration[f"follower_{name}"]) for name in self.leader_arms: self.leader_arms[name].set_calibration(calibration[f"leader_{name}"]) # Set better PID values to close the gap between recored states and actions # TODO(rcadene): Implement an automatic procedure to set optimial PID values for each motor for name in self.follower_arms: self.follower_arms[name].write("Position_P_Gain", 1500, "elbow_flex") self.follower_arms[name].write("Position_I_Gain", 0, "elbow_flex") self.follower_arms[name].write("Position_D_Gain", 600, "elbow_flex") # Enable torque on all motors of the follower arms for name in self.follower_arms: print(f"Activating torque on {name} follower arm.") self.follower_arms[name].write("Torque_Enable", 1) if self.config.gripper_open_degree is not None: # Set the leader arm in torque mode with the gripper motor set to an angle. This makes it possible # to squeeze the gripper and have it spring back to an open position on its own. for name in self.leader_arms: self.leader_arms[name].write("Torque_Enable", 1, "gripper") self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper") # Connect the cameras for name in self.cameras: self.cameras[name].connect() self.is_connected = True def run_calibration(self): calibration = {} for name in self.follower_arms: homing_offset, drive_mode = run_arm_calibration(self.follower_arms[name], name, "follower") calibration[f"follower_{name}"] = {} for idx, motor_name in enumerate(self.follower_arms[name].motor_names): calibration[f"follower_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx]) for name in self.leader_arms: homing_offset, drive_mode = run_arm_calibration(self.leader_arms[name], name, "leader") calibration[f"leader_{name}"] = {} for idx, motor_name in enumerate(self.leader_arms[name].motor_names): calibration[f"leader_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx]) return calibration def teleop_step( self, record_data=False ) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: if not self.is_connected: raise RobotDeviceNotConnectedError( "KochRobot is not connected. You need to run `robot.connect()`." ) # Prepare to assign the position of the leader to the follower leader_pos = {} for name in self.leader_arms: before_lread_t = time.perf_counter() leader_pos[name] = self.leader_arms[name].read("Present_Position") self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - before_lread_t follower_goal_pos = {} for name in self.leader_arms: follower_goal_pos[name] = leader_pos[name] # Send action for name in self.follower_arms: before_fwrite_t = time.perf_counter() self.send_action(torch.tensor(follower_goal_pos[name]), [name]) self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t # Early exit when recording data is not requested if not record_data: return # TODO(rcadene): Add velocity and other info # Read follower position follower_pos = {} for name in self.follower_arms: before_fread_t = time.perf_counter() follower_pos[name] = self.follower_arms[name].read("Present_Position") self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t # Create state by concatenating follower current position state = [] for name in self.follower_arms: if name in follower_pos: state.append(follower_pos[name]) state = np.concatenate(state) # Create action by concatenating follower goal position action = [] for name in self.follower_arms: if name in follower_goal_pos: action.append(follower_goal_pos[name]) action = np.concatenate(action) # Capture images from cameras images = {} for name in self.cameras: before_camread_t = time.perf_counter() images[name] = self.cameras[name].async_read() self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t # Populate output dictionnaries and format to pytorch obs_dict, action_dict = {}, {} obs_dict["observation.state"] = torch.from_numpy(state) action_dict["action"] = torch.from_numpy(action) for name in self.cameras: obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name]) return obs_dict, action_dict def capture_observation(self): """The returned observations do not have a batch dimension.""" if not self.is_connected: raise RobotDeviceNotConnectedError( "KochRobot is not connected. You need to run `robot.connect()`." ) # Read follower position follower_pos = {} for name in self.follower_arms: before_fread_t = time.perf_counter() follower_pos[name] = self.follower_arms[name].read("Present_Position") self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t # Create state by concatenating follower current position state = [] for name in self.follower_arms: if name in follower_pos: state.append(follower_pos[name]) state = np.concatenate(state) # Capture images from cameras images = {} for name in self.cameras: before_camread_t = time.perf_counter() images[name] = self.cameras[name].async_read() self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t # Populate output dictionnaries and format to pytorch obs_dict = {} obs_dict["observation.state"] = torch.from_numpy(state) for name in self.cameras: obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name]) return obs_dict def send_action(self, action: torch.Tensor, follower_names: list[str] | None = None): """Command the follower arms to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter `max_relative_target`. Args: action: tensor containing the concatenated joint positions for the follower arms. follower_names: Pass follower arm names to only control a subset of all the follower arms. """ if not self.is_connected: raise RobotDeviceNotConnectedError( "KochRobot is not connected. You need to run `robot.connect()`." ) if follower_names is None: follower_names = list(self.follower_arms) elif not set(follower_names).issubset(self.follower_arms): raise ValueError( f"You provided {follower_names=} but only the following arms are registered: " f"{list(self.follower_arms)}" ) from_idx = 0 to_idx = 0 follower_goal_pos = {} for name in follower_names: to_idx += len(self.follower_arms[name].motor_names) this_action = action[from_idx:to_idx] if self.config.max_relative_target is not None: if not isinstance(self.config.max_relative_target, list): max_relative_target = [self.config.max_relative_target for _ in range(from_idx, to_idx)] max_relative_target = torch.tensor(self.config.max_relative_target) # Cap relative action target magnitude for safety. current_pos = torch.tensor(self.follower_arms[name].read("Present_Position")) diff = this_action - current_pos safe_diff = torch.minimum(diff, max_relative_target) safe_diff = torch.maximum(safe_diff, -max_relative_target) safe_action = current_pos + safe_diff if not torch.allclose(safe_action, action): logging.warning( "Relative action magnitude had to be clamped to be safe.\n" f" requested relative action target: {diff}\n" f" clamped relative action target: {safe_diff}" ) follower_goal_pos[name] = safe_action.numpy() from_idx = to_idx for name in self.follower_arms: self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32)) def disconnect(self): if not self.is_connected: raise RobotDeviceNotConnectedError( "KochRobot is not connected. You need to run `robot.connect()` before disconnecting." ) for name in self.follower_arms: self.follower_arms[name].disconnect() for name in self.leader_arms: self.leader_arms[name].disconnect() for name in self.cameras: self.cameras[name].disconnect() self.is_connected = False def __del__(self): if getattr(self, "is_connected", False): self.disconnect()