diff --git a/lerobot/common/robots/koch_follower/koch_follower.py b/lerobot/common/robots/koch_follower/koch_follower.py index d37f50ac8..a9ac1c82d 100644 --- a/lerobot/common/robots/koch_follower/koch_follower.py +++ b/lerobot/common/robots/koch_follower/koch_follower.py @@ -48,7 +48,7 @@ class KochFollower(Robot): def __init__(self, config: KochFollowerConfig): super().__init__(config) self.config = config - self.arm = DynamixelMotorsBus( + self.bus = DynamixelMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "xl430-w250", MotorNormMode.RANGE_M100_100), @@ -64,7 +64,7 @@ class KochFollower(Robot): @property def _motors_ft(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: @@ -83,7 +83,7 @@ class KochFollower(Robot): @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: """ @@ -93,7 +93,7 @@ class KochFollower(Robot): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -105,30 +105,30 @@ class KochFollower(Robot): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() full_turn_motors = ["shoulder_pan", "wrist_roll"] - unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors] + unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors] print( f"Move all joints except {full_turn_motors} sequentially through their entire " "ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for motor in full_turn_motors: range_mins[motor] = 0 range_maxes[motor] = 4095 self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, @@ -137,19 +137,19 @@ class KochFollower(Robot): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: - with self.arm.torque_disabled(): - self.arm.configure_motors() + with self.bus.torque_disabled(): + self.bus.configure_motors() # Use 'extended position mode' for all motors except gripper, because in joint mode the servos # can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling # the arm, you could end up with a servo with a position 0 or 4095 at a crucial point - for motor in self.arm.motors: + for motor in self.bus.motors: if motor != "gripper": - self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) + self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) # Use 'position control current based' for gripper to be limited by the limit of the current. For # the follower gripper, it means it can grasp an object without forcing too much even tho, its @@ -157,19 +157,19 @@ class KochFollower(Robot): # For the leader gripper, it means we can use it as a physical trigger, since we can force with # our finger to make it move, and it will move back to its original target position when we # release the force. - self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) + self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) # Set better PID values to close the gap between recorded states and actions # TODO(rcadene): Implement an automatic procedure to set optimal PID values for each motor - self.arm.write("Position_P_Gain", "elbow_flex", 1500) - self.arm.write("Position_I_Gain", "elbow_flex", 0) - self.arm.write("Position_D_Gain", "elbow_flex", 600) + self.bus.write("Position_P_Gain", "elbow_flex", 1500) + self.bus.write("Position_I_Gain", "elbow_flex", 0) + self.bus.write("Position_D_Gain", "elbow_flex", 600) def setup_motors(self) -> None: - for motor in reversed(self.arm.motors): + for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.arm.setup_motor(motor) - print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_observation(self) -> dict[str, Any]: if not self.is_connected: @@ -179,7 +179,7 @@ class KochFollower(Robot): # Read arm position start = time.perf_counter() - obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + obs_dict[OBS_STATE] = self.bus.sync_read("Present_Position") obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -214,19 +214,19 @@ class KochFollower(Robot): # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.arm.sync_read("Present_Position") + present_pos = self.bus.sync_read("Present_Position") goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.sync_write("Goal_Position", goal_pos) + self.bus.sync_write("Goal_Position", goal_pos) return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect(self.config.disable_torque_on_disconnect) + self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() diff --git a/lerobot/common/robots/moss_follower/moss_follower.py b/lerobot/common/robots/moss_follower/moss_follower.py index 982e2d47a..ae4bdfb92 100644 --- a/lerobot/common/robots/moss_follower/moss_follower.py +++ b/lerobot/common/robots/moss_follower/moss_follower.py @@ -45,7 +45,7 @@ class MossRobot(Robot): def __init__(self, config: MossRobotConfig): super().__init__(config) self.config = config - self.arm = FeetechMotorsBus( + self.bus = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), @@ -61,7 +61,7 @@ class MossRobot(Robot): @property def _motors_ft(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: @@ -80,7 +80,7 @@ class MossRobot(Robot): @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: """ @@ -90,7 +90,7 @@ class MossRobot(Robot): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -103,29 +103,29 @@ class MossRobot(Robot): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() full_turn_motor = "wrist_roll" - unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor] + unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor] print( f"Move all joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) range_mins[full_turn_motor] = 0 range_maxes[full_turn_motor] = 4095 self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, @@ -134,26 +134,26 @@ class MossRobot(Robot): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) def configure(self) -> None: - with self.arm.torque_disabled(): - self.arm.configure_motors() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + with self.bus.torque_disabled(): + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.arm.write("P_Coefficient", motor, 16) + self.bus.write("P_Coefficient", motor, 16) # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.arm.write("I_Coefficient", motor, 0) - self.arm.write("D_Coefficient", motor, 32) + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 32) def setup_motors(self) -> None: - for motor in reversed(self.arm.motors): + for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.arm.setup_motor(motor) - print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_observation(self) -> dict[str, Any]: if not self.is_connected: @@ -161,7 +161,7 @@ class MossRobot(Robot): # Read arm position start = time.perf_counter() - obs_dict = self.arm.sync_read("Present_Position") + obs_dict = self.bus.sync_read("Present_Position") obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -196,19 +196,19 @@ class MossRobot(Robot): # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.arm.sync_read("Present_Position") + present_pos = self.bus.sync_read("Present_Position") goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.sync_write("Goal_Position", goal_pos) + self.bus.sync_write("Goal_Position", goal_pos) return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect(self.config.disable_torque_on_disconnect) + self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() diff --git a/lerobot/common/robots/so100_follower/so100_follower.py b/lerobot/common/robots/so100_follower/so100_follower.py index 5f999ae58..bb9058ee5 100644 --- a/lerobot/common/robots/so100_follower/so100_follower.py +++ b/lerobot/common/robots/so100_follower/so100_follower.py @@ -45,7 +45,7 @@ class SO100Follower(Robot): def __init__(self, config: SO100FollowerConfig): super().__init__(config) self.config = config - self.arm = FeetechMotorsBus( + self.bus = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), @@ -61,7 +61,7 @@ class SO100Follower(Robot): @property def _motors_ft(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: @@ -80,7 +80,7 @@ class SO100Follower(Robot): @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: """ @@ -90,7 +90,7 @@ class SO100Follower(Robot): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -103,29 +103,29 @@ class SO100Follower(Robot): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() full_turn_motor = "wrist_roll" - unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor] + unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor] print( f"Move all joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) range_mins[full_turn_motor] = 0 range_maxes[full_turn_motor] = 4095 self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, @@ -134,26 +134,26 @@ class SO100Follower(Robot): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) def configure(self) -> None: - with self.arm.torque_disabled(): - self.arm.configure_motors() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + with self.bus.torque_disabled(): + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.arm.write("P_Coefficient", motor, 16) + self.bus.write("P_Coefficient", motor, 16) # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.arm.write("I_Coefficient", motor, 0) - self.arm.write("D_Coefficient", motor, 32) + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 32) def setup_motors(self) -> None: - for motor in reversed(self.arm.motors): + for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.arm.setup_motor(motor) - print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_observation(self) -> dict[str, Any]: if not self.is_connected: @@ -161,7 +161,7 @@ class SO100Follower(Robot): # Read arm position start = time.perf_counter() - obs_dict = self.arm.sync_read("Present_Position") + obs_dict = self.bus.sync_read("Present_Position") obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -196,19 +196,19 @@ class SO100Follower(Robot): # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.arm.sync_read("Present_Position") + present_pos = self.bus.sync_read("Present_Position") goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.sync_write("Goal_Position", goal_pos) + self.bus.sync_write("Goal_Position", goal_pos) return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect(self.config.disable_torque_on_disconnect) + self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() diff --git a/lerobot/common/robots/so101_follower/so101_follower.py b/lerobot/common/robots/so101_follower/so101_follower.py index 6d555005c..da97d6ad0 100644 --- a/lerobot/common/robots/so101_follower/so101_follower.py +++ b/lerobot/common/robots/so101_follower/so101_follower.py @@ -45,7 +45,7 @@ class SO101Follower(Robot): def __init__(self, config: SO101FollowerConfig): super().__init__(config) self.config = config - self.arm = FeetechMotorsBus( + self.bus = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), @@ -61,7 +61,7 @@ class SO101Follower(Robot): @property def _motors_ft(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: @@ -80,7 +80,7 @@ class SO101Follower(Robot): @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: """ @@ -90,7 +90,7 @@ class SO101Follower(Robot): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -103,25 +103,25 @@ class SO101Follower(Robot): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() print( "Move all joints sequentially through their entire ranges " "of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion() + range_mins, range_maxes = self.bus.record_ranges_of_motion() self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, @@ -130,26 +130,26 @@ class SO101Follower(Robot): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) def configure(self) -> None: - with self.arm.torque_disabled(): - self.arm.configure_motors() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + with self.bus.torque_disabled(): + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.arm.write("P_Coefficient", motor, 16) + self.bus.write("P_Coefficient", motor, 16) # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.arm.write("I_Coefficient", motor, 0) - self.arm.write("D_Coefficient", motor, 32) + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 32) def setup_motors(self) -> None: - for motor in reversed(self.arm.motors): + for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.arm.setup_motor(motor) - print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_observation(self) -> dict[str, Any]: if not self.is_connected: @@ -157,7 +157,7 @@ class SO101Follower(Robot): # Read arm position start = time.perf_counter() - obs_dict = self.arm.sync_read("Present_Position") + obs_dict = self.bus.sync_read("Present_Position") obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -192,19 +192,19 @@ class SO101Follower(Robot): # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.arm.sync_read("Present_Position") + present_pos = self.bus.sync_read("Present_Position") goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.sync_write("Goal_Position", goal_pos) + self.bus.sync_write("Goal_Position", goal_pos) return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect(self.config.disable_torque_on_disconnect) + self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() diff --git a/lerobot/common/robots/viperx/viperx.py b/lerobot/common/robots/viperx/viperx.py index 3a497113a..1c60d81c4 100644 --- a/lerobot/common/robots/viperx/viperx.py +++ b/lerobot/common/robots/viperx/viperx.py @@ -39,7 +39,7 @@ class ViperX(Robot): ): super().__init__(config) self.config = config - self.arm = DynamixelMotorsBus( + self.bus = DynamixelMotorsBus( port=self.config.port, motors={ "waist": Motor(1, "xm540-w270", MotorNormMode.RANGE_M100_100), @@ -57,7 +57,7 @@ class ViperX(Robot): @property def _motors_ft(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: @@ -76,7 +76,7 @@ class ViperX(Robot): @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: """ @@ -86,7 +86,7 @@ class ViperX(Robot): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -98,31 +98,31 @@ class ViperX(Robot): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() full_turn_motors = ["shoulder_pan", "wrist_roll"] - unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors] + unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors] print( f"Move all joints except {full_turn_motors} sequentially through their entire " "ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for motor in full_turn_motors: range_mins[motor] = 0 range_maxes[motor] = 4095 self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, @@ -131,36 +131,36 @@ class ViperX(Robot): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: - with self.arm.torque_disabled(): - self.arm.configure_motors() + with self.bus.torque_disabled(): + self.bus.configure_motors() # Set secondary/shadow ID for shoulder and elbow. These joints have two motors. # As a result, if only one of them is required to move to a certain position, # the other will follow. This is to avoid breaking the motors. - self.arm.write("Secondary_ID", "shoulder_shadow", 2) - self.arm.write("Secondary_ID", "elbow_shadow", 4) + self.bus.write("Secondary_ID", "shoulder_shadow", 2) + self.bus.write("Secondary_ID", "elbow_shadow", 4) # Set a velocity limit of 131 as advised by Trossen Robotics # TODO(aliberts): remove as it's actually useless in position control - self.arm.write("Velocity_Limit", 131) + self.bus.write("Velocity_Limit", 131) # Use 'extended position mode' for all motors except gripper, because in joint mode the servos # can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling # the arm, you could end up with a servo with a position 0 or 4095 at a crucial point. # See: https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11 - for motor in self.arm.motors: + for motor in self.bus.motors: if motor != "gripper": - self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) + self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) # Use 'position control current based' for follower gripper to be limited by the limit of the # current. It can grasp an object without forcing too much even tho, it's goal position is a # complete grasp (both gripper fingers are ordered to join and reach a touch). - self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) + self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) def get_observation(self) -> dict[str, Any]: """The returned observations do not have a batch dimension.""" @@ -171,7 +171,7 @@ class ViperX(Robot): # Read arm position start = time.perf_counter() - obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + obs_dict[OBS_STATE] = self.bus.sync_read("Present_Position") obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -206,19 +206,19 @@ class ViperX(Robot): # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.arm.sync_read("Present_Position") + present_pos = self.bus.sync_read("Present_Position") goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.sync_write("Goal_Position", goal_pos) + self.bus.sync_write("Goal_Position", goal_pos) return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect(self.config.disable_torque_on_disconnect) + self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() diff --git a/lerobot/common/teleoperators/koch_leader/koch_leader.py b/lerobot/common/teleoperators/koch_leader/koch_leader.py index 8f5ac4576..6821e0200 100644 --- a/lerobot/common/teleoperators/koch_leader/koch_leader.py +++ b/lerobot/common/teleoperators/koch_leader/koch_leader.py @@ -44,7 +44,7 @@ class KochLeader(Teleoperator): def __init__(self, config: KochLeaderConfig): super().__init__(config) self.config = config - self.arm = DynamixelMotorsBus( + self.bus = DynamixelMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "xl330-m077", MotorNormMode.RANGE_M100_100), @@ -59,7 +59,7 @@ class KochLeader(Teleoperator): @property def action_features(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def feedback_features(self) -> dict[str, type]: @@ -67,13 +67,13 @@ class KochLeader(Teleoperator): @property def is_connected(self) -> bool: - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -82,33 +82,33 @@ class KochLeader(Teleoperator): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) - self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) - drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.arm.motors} + self.bus.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) + drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.bus.motors} input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() full_turn_motors = ["shoulder_pan", "wrist_roll"] - unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors] + unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors] print( f"Move all joints except {full_turn_motors} sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for motor in full_turn_motors: range_mins[motor] = 0 range_maxes[motor] = 4095 self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=drive_modes[motor], @@ -117,43 +117,43 @@ class KochLeader(Teleoperator): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: - self.arm.disable_torque() - self.arm.configure_motors() - for motor in self.arm.motors: + self.bus.disable_torque() + self.bus.configure_motors() + for motor in self.bus.motors: if motor != "gripper": # Use 'extended position mode' for all motors except gripper, because in joint mode the servos # can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while # assembling the arm, you could end up with a servo with a position 0 or 4095 at a crucial # point - self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) + self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) # Use 'position control current based' for gripper to be limited by the limit of the current. # For the follower gripper, it means it can grasp an object without forcing too much even tho, # its goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch). # For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger # to make it move, and it will move back to its original target position when we release the force. - self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) + self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) # Set gripper's goal pos in current position mode so that we can use it as a trigger. - self.arm.enable_torque("gripper") - self.arm.write("Goal_Position", "gripper", self.config.gripper_open_pos) + self.bus.enable_torque("gripper") + self.bus.write("Goal_Position", "gripper", self.config.gripper_open_pos) def setup_motors(self) -> None: - for motor in reversed(self.arm.motors): + for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.arm.setup_motor(motor) - print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_action(self) -> dict[str, float]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") start = time.perf_counter() - action = self.arm.sync_read("Present_Position") + action = self.bus.sync_read("Present_Position") action = {f"{motor}.pos": val for motor, val in action.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") @@ -167,5 +167,5 @@ class KochLeader(Teleoperator): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect() + self.bus.disconnect() logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/so100_leader/so100_leader.py b/lerobot/common/teleoperators/so100_leader/so100_leader.py index a063edd1f..900346ad5 100644 --- a/lerobot/common/teleoperators/so100_leader/so100_leader.py +++ b/lerobot/common/teleoperators/so100_leader/so100_leader.py @@ -41,7 +41,7 @@ class SO100Leader(Teleoperator): def __init__(self, config: SO100LeaderConfig): super().__init__(config) self.config = config - self.arm = FeetechMotorsBus( + self.bus = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), @@ -56,7 +56,7 @@ class SO100Leader(Teleoperator): @property def action_features(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def feedback_features(self) -> dict[str, type]: @@ -64,13 +64,13 @@ class SO100Leader(Teleoperator): @property def is_connected(self) -> bool: - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -79,29 +79,29 @@ class SO100Leader(Teleoperator): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() full_turn_motor = "wrist_roll" - unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor] + unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor] print( f"Move all joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) range_mins[full_turn_motor] = 0 range_maxes[full_turn_motor] = 4095 self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, @@ -110,25 +110,25 @@ class SO100Leader(Teleoperator): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: - self.arm.disable_torque() - self.arm.configure_motors() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) def setup_motors(self) -> None: - for motor in reversed(self.arm.motors): + for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.arm.setup_motor(motor) - print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_action(self) -> dict[str, float]: start = time.perf_counter() - action = self.arm.sync_read("Present_Position") + action = self.bus.sync_read("Present_Position") action = {f"{motor}.pos": val for motor, val in action.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") @@ -142,5 +142,5 @@ class SO100Leader(Teleoperator): if not self.is_connected: DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect() + self.bus.disconnect() logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/so101_leader/so101_leader.py b/lerobot/common/teleoperators/so101_leader/so101_leader.py index 00c82304a..34ad31daf 100644 --- a/lerobot/common/teleoperators/so101_leader/so101_leader.py +++ b/lerobot/common/teleoperators/so101_leader/so101_leader.py @@ -41,7 +41,7 @@ class SO101Leader(Teleoperator): def __init__(self, config: SO101LeaderConfig): super().__init__(config) self.config = config - self.arm = FeetechMotorsBus( + self.bus = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), @@ -56,7 +56,7 @@ class SO101Leader(Teleoperator): @property def action_features(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def feedback_features(self) -> dict[str, type]: @@ -64,13 +64,13 @@ class SO101Leader(Teleoperator): @property def is_connected(self) -> bool: - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -79,25 +79,25 @@ class SO101Leader(Teleoperator): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() print( "Move all joints sequentially through their entire ranges " "of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion() + range_mins, range_maxes = self.bus.record_ranges_of_motion() self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, @@ -106,25 +106,25 @@ class SO101Leader(Teleoperator): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: - self.arm.disable_torque() - self.arm.configure_motors() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) def setup_motors(self) -> None: - for motor in reversed(self.arm.motors): + for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.arm.setup_motor(motor) - print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_action(self) -> dict[str, float]: start = time.perf_counter() - action = self.arm.sync_read("Present_Position") + action = self.bus.sync_read("Present_Position") action = {f"{motor}.pos": val for motor, val in action.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") @@ -138,5 +138,5 @@ class SO101Leader(Teleoperator): if not self.is_connected: DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect() + self.bus.disconnect() logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/widowx/widowx.py b/lerobot/common/teleoperators/widowx/widowx.py index 4b09f6d07..209f7df1d 100644 --- a/lerobot/common/teleoperators/widowx/widowx.py +++ b/lerobot/common/teleoperators/widowx/widowx.py @@ -42,7 +42,7 @@ class WidowX(Teleoperator): def __init__(self, config: WidowXConfig): super().__init__(config) self.config = config - self.arm = DynamixelMotorsBus( + self.bus = DynamixelMotorsBus( port=self.config.port, motors={ "waist": Motor(1, "xm430-w350", MotorNormMode.RANGE_M100_100), @@ -59,7 +59,7 @@ class WidowX(Teleoperator): @property def action_features(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.arm.motors} + return {f"{motor}.pos": float for motor in self.bus.motors} @property def feedback_features(self) -> dict[str, type]: @@ -67,13 +67,13 @@ class WidowX(Teleoperator): @property def is_connected(self) -> bool: - return self.arm.is_connected + return self.bus.is_connected def connect(self, calibrate: bool = True): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -82,34 +82,34 @@ class WidowX(Teleoperator): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch) logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for motor in self.arm.motors: - self.arm.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) - self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) - drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.arm.motors} + self.bus.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) + drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.bus.motors} input("Move robot to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.bus.set_half_turn_homings() full_turn_motors = ["shoulder_pan", "wrist_roll"] - unknown_range_motors = [motor for motor in self.arm.motors if motor not in full_turn_motors] + unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors] print( f"Move all joints except {full_turn_motors} sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for motor in full_turn_motors: range_mins[motor] = 0 range_maxes[motor] = 4095 self.calibration = {} - for motor, m in self.arm.motors.items(): + for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=drive_modes[motor], @@ -118,26 +118,26 @@ class WidowX(Teleoperator): range_max=range_maxes[motor], ) - self.arm.write_calibration(self.calibration) + self.bus.write_calibration(self.calibration) self._save_calibration() logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: - self.arm.disable_torque() - self.arm.configure_motors() + self.bus.disable_torque() + self.bus.configure_motors() # Set secondary/shadow ID for shoulder and elbow. These joints have two motors. # As a result, if only one of them is required to move to a certain position, # the other will follow. This is to avoid breaking the motors. - self.arm.write("Secondary_ID", "shoulder_shadow", 2) - self.arm.write("Secondary_ID", "elbow_shadow", 4) + self.bus.write("Secondary_ID", "shoulder_shadow", 2) + self.bus.write("Secondary_ID", "elbow_shadow", 4) def get_action(self) -> dict[str, float]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") start = time.perf_counter() - action = self.arm.sync_read("Present_Position") + action = self.bus.sync_read("Present_Position") action = {f"{motor}.pos": val for motor, val in action.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") @@ -150,5 +150,5 @@ class WidowX(Teleoperator): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect() + self.bus.disconnect() logger.info(f"{self} disconnected.")