diff --git a/realman.yml b/realman.yml index a4101098..4ab738f5 100644 --- a/realman.yml +++ b/realman.yml @@ -1,5 +1,8 @@ type: realman port: 192.168.3.18:8080 +gripper_range: + - 10 + - 990 motors: joint_1: id: 1 diff --git a/src/lerobot/motors/realman/realman.py b/src/lerobot/motors/realman/realman.py index 5be11139..b64cae23 100644 --- a/src/lerobot/motors/realman/realman.py +++ b/src/lerobot/motors/realman/realman.py @@ -17,6 +17,7 @@ class RealmanMotorsBus(MotorsBus): self, port: str, motors:dict[str, Motor], + gripper_range:list[int], joint: list, calibration: dict[str, MotorCalibration] | None = None, ): @@ -25,6 +26,7 @@ class RealmanMotorsBus(MotorsBus): address = port.split(':') self.handle = self.rmarm.rm_create_robot_arm(address[0], int(address[1])) self.motors = motors + self.gripper_range = gripper_range self.init_joint_position = joint self.safe_disable_position = joint self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1) @@ -105,7 +107,7 @@ class RealmanMotorsBus(MotorsBus): """ 移动到初始位置 """ - self.write(target_joint=self.init_joint_position) + self.write_arm(target_joint=self.init_joint_position) def write(self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0): return False @@ -176,6 +178,10 @@ class RealmanMotorsBus(MotorsBus): def sync_write(self,name,actionData: dict[str, Any]): if name =="Goal_Position": self.write_arm(target_joint=actionData) + + def sync_read(self, data_name, motors = None, *, normalize = True, num_retry = 0): + if data_name == "Present_Position": + return self.read() def _assert_protocol_is_compatible(self, instruction_name): return True @@ -185,14 +191,7 @@ class RealmanMotorsBus(MotorsBus): 配置电机 """ def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None: - for motor in self.motors: - # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on - # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0). - self.write("Return_Delay_Time", motor, return_delay_time) - # Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors. - if self.protocol_version == 0: - self.write("Maximum_Acceleration", motor, maximum_acceleration) - self.write("Acceleration", motor, acceleration) + self.rmarm.rm_set_gripper_route(self.gripper_range[0],self.gripper_range[1]) def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0): for motor in self._get_motors_list(motors): @@ -207,30 +206,32 @@ class RealmanMotorsBus(MotorsBus): @property def is_calibrated(self) -> bool: - motors_calibration = self.read_calibration() - if set(motors_calibration) != set(self.calibration): - return False + return True + # motors_calibration = self.read_calibration() + # if set(motors_calibration) != set(self.calibration): + # return False - same_ranges = all( - self.calibration[motor].range_min == cal.range_min - and self.calibration[motor].range_max == cal.range_max - for motor, cal in motors_calibration.items() - ) - if self.protocol_version == 1: - return same_ranges + # same_ranges = all( + # self.calibration[motor].range_min == cal.range_min + # and self.calibration[motor].range_max == cal.range_max + # for motor, cal in motors_calibration.items() + # ) + # if self.protocol_version == 1: + # return same_ranges - same_offsets = all( - self.calibration[motor].homing_offset == cal.homing_offset - for motor, cal in motors_calibration.items() - ) - return same_ranges and same_offsets + # same_offsets = all( + # self.calibration[motor].homing_offset == cal.homing_offset + # for motor, cal in motors_calibration.items() + # ) + # return same_ranges and same_offsets def write_calibration(self, calibration_dict, cache = True): - for motor, calibration in calibration_dict.items(): - self.rmarm.rm_set_joint_min_pos(self, motor, calibration.range_min) - self.rmarm.rm_set_joint_max_pos(self, motor, calibration.range_max) - if cache: - self.calibration = calibration_dict + pass + # for motor, calibration in calibration_dict.items(): + # self.rmarm.rm_set_joint_min_pos(self, motor, calibration.range_min) + # self.rmarm.rm_set_joint_max_pos(self, motor, calibration.range_max) + # if cache: + # self.calibration = calibration_dict def _get_half_turn_homings(self, positions): offsets = {} diff --git a/src/lerobot/robots/realman/realman_robot.py b/src/lerobot/robots/realman/realman_robot.py index 29604705..13a1f8fb 100644 --- a/src/lerobot/robots/realman/realman_robot.py +++ b/src/lerobot/robots/realman/realman_robot.py @@ -17,11 +17,11 @@ class RealmanRobot(Robot): def __init__(self, config: RealmanRobotConfig): super().__init__(config) self.config = config - print(config) self.bus = RealmanMotorsBus( port=self.config.port, motors=config.motors, joint=config.joint, + gripper_range=config.gripper_range, calibration=self.calibration, ) self.cameras = make_cameras_from_configs(config.cameras) @@ -56,15 +56,15 @@ class RealmanRobot(Robot): raise DeviceAlreadyConnectedError(f"{self} already connected") self.bus.connect() + + for cam in self.cameras.values(): + cam.connect() + if not self.is_calibrated and calibrate: logger.info( "Mismatch between calibration values in the motor and the calibration file or no calibration file found" ) self.calibrate() - - for cam in self.cameras.values(): - cam.connect() - self.configure() logger.info(f"{self} connected.") @@ -73,69 +73,18 @@ class RealmanRobot(Robot): return self.bus.is_calibrated def calibrate(self) -> None: - if self.calibration: - # Calibration file exists, ask user whether to use it or run new calibration - user_input = input( - f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: " - ) - if user_input.strip().lower() != "c": - logger.info(f"Writing calibration file associated with the id {self.id} to the motors") - self.bus.write_calibration(self.calibration) - return - - logger.info(f"\nRunning calibration of {self}") - self.bus.disable_torque() - for motor in self.bus.motors: - self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) - - input(f"Move {self} to the middle of its range of motion and press ENTER....") - homing_offsets = self.bus.set_half_turn_homings() - - full_turn_motor = "wrist_roll" - unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor] - print( - f"Move all joints except '{full_turn_motor}' sequentially through their " - "entire ranges of motion.\nRecording positions. Press ENTER to stop..." - ) - range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) - range_mins[full_turn_motor] = 0 - range_maxes[full_turn_motor] = 4095 - - self.calibration = {} - for motor, m in self.bus.motors.items(): - self.calibration[motor] = MotorCalibration( - id=m.id, - drive_mode=0, - homing_offset=homing_offsets[motor], - range_min=range_mins[motor], - range_max=range_maxes[motor], - ) - - self.bus.write_calibration(self.calibration) - self._save_calibration() - print("Calibration saved to", self.calibration_fpath) + """move piper to the home position""" + if not self.is_connected: + raise ConnectionError() + + self.bus.apply_calibration() def configure(self) -> None: with self.bus.torque_disabled(): self.bus.configure_motors() - for motor in self.bus.motors: - self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) - # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.bus.write("P_Coefficient", motor, 16) - # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.bus.write("I_Coefficient", motor, 0) - self.bus.write("D_Coefficient", motor, 32) - - if motor == "gripper": - self.bus.write("Max_Torque_Limit", motor, 500) # 50% of max torque to avoid burnout - self.bus.write("Protection_Current", motor, 250) # 50% of max current to avoid burnout - self.bus.write("Overload_Torque", motor, 25) # 25% torque when overloaded def setup_motors(self) -> None: - for motor in reversed(self.bus.motors): - input(f"Connect the controller board to the '{motor}' motor only and press enter.") - self.bus.setup_motor(motor) - print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + pass def get_observation(self) -> dict[str, Any]: if not self.is_connected: diff --git a/src/lerobot/robots/realman/realman_robot_config.py b/src/lerobot/robots/realman/realman_robot_config.py index 6e9edd46..63f2cde6 100644 --- a/src/lerobot/robots/realman/realman_robot_config.py +++ b/src/lerobot/robots/realman/realman_robot_config.py @@ -9,6 +9,7 @@ from lerobot.robots.config import RobotConfig class RealmanRobotConfig(RobotConfig): # Port to connect to the arm port: str + gripper_range: list[int] = [10, 990] disable_torque_on_disconnect: bool = True # cameras cameras: dict[str, RealSenseCameraConfig] = field(default_factory=dict)