From cf4bb351ed7e8f7027fd4334f4286c5ef519afd7 Mon Sep 17 00:00:00 2001 From: yehao <1002142102@qq.com> Date: Thu, 4 Dec 2025 11:37:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=9E=E7=8E=B0realman?= =?UTF-8?q?=E5=8D=95=E8=87=82=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- calibrate.yml | 1 + pyproject.toml | 1 + realman.yml | 58 ++++ src/lerobot/__init__.py | 1 + src/lerobot/motors/realman/__init__.py | 0 src/lerobot/motors/realman/realman.py | 294 ++++++++++++++++++ .../realman/realman_motors_bus_config.py | 0 src/lerobot/realman/lerobot_robot_config.py | 17 + src/lerobot/realman/lerobot_robot_robot.py | 197 ++++++++++++ src/lerobot/robots/realman/__init__.py | 3 + src/lerobot/robots/realman/realman_robot.py | 197 ++++++++++++ .../robots/realman/realman_robot_config.py | 17 + src/lerobot/scripts/lerobot_calibrate.py | 3 +- 13 files changed, 788 insertions(+), 1 deletion(-) create mode 100644 calibrate.yml create mode 100644 realman.yml create mode 100644 src/lerobot/motors/realman/__init__.py create mode 100644 src/lerobot/motors/realman/realman.py create mode 100644 src/lerobot/motors/realman/realman_motors_bus_config.py create mode 100644 src/lerobot/realman/lerobot_robot_config.py create mode 100644 src/lerobot/realman/lerobot_robot_robot.py create mode 100644 src/lerobot/robots/realman/__init__.py create mode 100644 src/lerobot/robots/realman/realman_robot.py create mode 100644 src/lerobot/robots/realman/realman_robot_config.py diff --git a/calibrate.yml b/calibrate.yml new file mode 100644 index 000000000..5d485c7d5 --- /dev/null +++ b/calibrate.yml @@ -0,0 +1 @@ +robot: !include realman.yml \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e953820ff..db7c92aea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ dependencies = [ "deepdiff>=7.0.1,<9.0.0", "imageio[ffmpeg]>=2.34.0,<3.0.0", "termcolor>=2.4.0,<4.0.0", + "Robotic_Arm==1.0.4.1" ] # Optional dependencies diff --git a/realman.yml b/realman.yml new file mode 100644 index 000000000..a41010988 --- /dev/null +++ b/realman.yml @@ -0,0 +1,58 @@ +type: realman +port: 192.168.3.18:8080 +motors: + joint_1: + id: 1 + model: realman + norm_mode: DEGREES + joint_2: + id: 2 + model: realman + norm_mode: DEGREES + joint_3: + id: 3 + model: realman + norm_mode: DEGREES + joint_4: + id: 4 + model: realman + norm_mode: DEGREES + joint_5: + id: 5 + model: realman + norm_mode: DEGREES + joint_6: + id: 6 + model: realman + norm_mode: DEGREES + gripper: + id: 7 + model: realman + norm_mode: DEGREES +cameras: + left: + serial_number_or_name: 153122077516 + fps: 30 + width: 640 + height: 480 + use_depth: False + front: + serial_number_or_name: 145422072751 + fps: 30 + width: 640 + height: 480 + use_depth: False + high: + serial_number_or_name: 145422072193 + fps: 30 + width: 640 + height: 480 + use_depth: False +joint: + - -90 + - 90 + - 90 + - 90 + - 90 + - -90 + - 10 \ No newline at end of file diff --git a/src/lerobot/__init__.py b/src/lerobot/__init__.py index eec574296..6c4dd2245 100644 --- a/src/lerobot/__init__.py +++ b/src/lerobot/__init__.py @@ -45,6 +45,7 @@ When implementing a new policy class (e.g. `DiffusionPolicy`) follow these steps """ import itertools +import lerobot.robots.realman from lerobot.__version__ import __version__ # noqa: F401 diff --git a/src/lerobot/motors/realman/__init__.py b/src/lerobot/motors/realman/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/lerobot/motors/realman/realman.py b/src/lerobot/motors/realman/realman.py new file mode 100644 index 000000000..5be111397 --- /dev/null +++ b/src/lerobot/motors/realman/realman.py @@ -0,0 +1,294 @@ +from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address +import time +from typing import Any, Dict +from Robotic_Arm.rm_robot_interface import * +#动作执行成功 +ACTION_SUCCESS = 0 + +class RealmanMotorsBus(MotorsBus): + model_number_table={} + model_ctrl_table={ + "realman":{} + } + """ + 对Realman SDK的二次封装 + """ + def __init__( + self, + port: str, + motors:dict[str, Motor], + joint: list, + calibration: dict[str, MotorCalibration] | None = None, + ): + super().__init__(port,motors, calibration) + self.rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E) + address = port.split(':') + self.handle = self.rmarm.rm_create_robot_arm(address[0], int(address[1])) + self.motors = motors + self.init_joint_position = joint + self.safe_disable_position = joint + self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1) + time.sleep(3) + ret = self.rmarm.rm_get_current_arm_state() + self.init_pose = ret[1]['pose'] + + @property + def motor_names(self) -> list[str]: + return list(self.motors.keys()) + + @property + def motor_models(self) -> list[str]: + return [model for _, model in self.motors.values()] + + @property + def motor_indices(self) -> list[int]: + return [idx for idx, _ in self.motors.values()] + + + def connect(self, enable=True) -> bool: + ''' + 使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序 + ''' + enable_flag = False + loop_flag = False + # 设置超时时间(秒) + timeout = 5 + # 记录进入循环前的时间 + start_time = time.time() + elapsed_time_flag = False + + while not loop_flag: + elapsed_time = time.time() - start_time + print("--------------------") + + if enable: + # 获取机械臂状态 + ret = self.rmarm.rm_get_current_arm_state() + if ret[0] == 0: # 成功获取状态 + enable_flag = True + else: + enable_flag = False + # 断开所有连接,销毁线程 + RoboticArm.rm_destory() + print("使能状态:", enable_flag) + print("--------------------") + if(enable_flag == enable): + loop_flag = True + enable_flag = True + else: + loop_flag = False + enable_flag = False + # 检查是否超过超时时间 + if elapsed_time > timeout: + print("超时....") + elapsed_time_flag = True + enable_flag = True + break + time.sleep(1) + + resp = enable_flag + print(f"Returning response: {resp}") + return resp + + def motor_names(self): + return + + def read_calibration(self): + return + def set_calibration(self): + return + + def revert_calibration(self): + return + + def apply_calibration(self): + """ + 移动到初始位置 + """ + self.write(target_joint=self.init_joint_position) + + def write(self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0): + return False + + def write_arm(self, target_joint: list): + ret = self.rmarm.rm_movej_follow(target_joint[:-1]) + if ret != 0: + print("movej error:", ret[1]) + return ret + + ret = self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2) + return ret + + def write_endpose(self, target_endpose: list, gripper: int): + self.rmarm.rm_movej_p(target_endpose, 50, 0, 0, 1) + self.rmarm.rm_set_gripper_position(gripper, block=False, timeout=2) + + def write_joint_slow(self, target_joint: list): + self.rmarm.rm_movej(target_joint, 5, 0, 0, 0) + + def write_joint_canfd(self, target_joint: list): + self.rmarm.rm_movej_canfd(target_joint, False) + + def write_endpose_canfd(self, target_pose: list): + self.rmarm.rm_movep_canfd(target_pose, False) + + def write_gripper(self, gripper: int): + self.rmarm.rm_set_gripper_position(gripper, False, 2) + + def read(self) -> Dict: + """ + - 机械臂关节消息,单位1度;[-1, 1] + - 机械臂夹爪消息,[-1, 1] + """ + joint_msg = self.rmarm.rm_get_current_arm_state()[1] + joint_state = joint_msg['joint'] + + gripper_msg = self.rmarm.rm_get_gripper_state()[1] + gripper_state = gripper_msg['actpos'] + + return { + "joint_1": joint_state[0]/180, + "joint_2": joint_state[1]/180, + "joint_3": joint_state[2]/180, + "joint_4": joint_state[3]/180, + "joint_5": joint_state[4]/180, + "joint_6": joint_state[5]/180, + "gripper": (gripper_state-500)/500 + } + + def read_current_arm_joint_state(self): + return self.rmarm.rm_get_current_arm_state()[1]['joint'] + + def read_current_arm_endpose_state(self): + return self.rmarm.rm_get_current_arm_state()[1]['pose'] + + def safe_disconnect(self): + """ + Move to safe disconnect position + """ + self.write_arm(target_joint=self.safe_disable_position) + # 断开所有连接,销毁线程 + RoboticArm.rm_destory() + + """ + 向机械臂写入动作 + """ + def sync_write(self,name,actionData: dict[str, Any]): + if name =="Goal_Position": + self.write_arm(target_joint=actionData) + + def _assert_protocol_is_compatible(self, instruction_name): + return True + + + """ + 配置电机 + """ + def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None: + for motor in self.motors: + # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on + # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0). + self.write("Return_Delay_Time", motor, return_delay_time) + # Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors. + if self.protocol_version == 0: + self.write("Maximum_Acceleration", motor, maximum_acceleration) + self.write("Acceleration", motor, acceleration) + + def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0): + for motor in self._get_motors_list(motors): + self.rmarm.rm_set_joint_en_state(motor,0) + + def _disable_torque(self, motor: int, model: str, num_retry: int = 0): + self.rmarm.rm_set_joint_en_state(motor,0) + + def enable_torque(self, motors = None, num_retry = 0): + for motor in self._get_motors_list(motors): + self.rmarm.rm_set_joint_en_state(motor,1) + + @property + def is_calibrated(self) -> bool: + motors_calibration = self.read_calibration() + if set(motors_calibration) != set(self.calibration): + return False + + same_ranges = all( + self.calibration[motor].range_min == cal.range_min + and self.calibration[motor].range_max == cal.range_max + for motor, cal in motors_calibration.items() + ) + if self.protocol_version == 1: + return same_ranges + + same_offsets = all( + self.calibration[motor].homing_offset == cal.homing_offset + for motor, cal in motors_calibration.items() + ) + return same_ranges and same_offsets + + def write_calibration(self, calibration_dict, cache = True): + for motor, calibration in calibration_dict.items(): + self.rmarm.rm_set_joint_min_pos(self, motor, calibration.range_min) + self.rmarm.rm_set_joint_max_pos(self, motor, calibration.range_max) + if cache: + self.calibration = calibration_dict + + def _get_half_turn_homings(self, positions): + offsets = {} + for motor_id, current_pos in positions.items(): + # 确保输入是角度值 + current_deg = self._ensure_angle_degrees(current_pos, motor_id) + # 核心逻辑:让当前位置变成180° + offset = self._calculate_angle_offset(current_deg, motor_id) + offsets[motor_id] = offset + return offsets + + def _calculate_angle_offset(self, current_deg, motor_id): + """计算角度偏移量""" + + # 获取关节角度范围(从配置或SDK) + # min_angle, max_angle = self._get_joint_limits(motor_id) + + # 对于单圈关节,我们想要的目标是范围中点 + # 但set_half_turn_homings的语义是"变成半圈(180°)" + TARGET = 180.0 # 固定目标:180° + + # 计算基本偏移 + offset = TARGET - current_deg + + # 单圈关节处理:偏移应该在[-180, 180)范围内 + if motor_id <= 6: # 单圈关节 + # 优化偏移到最短路径 + while offset >= 180.0: + offset -= 360.0 + while offset < -180.0: + offset += 360.0 + else: # 多圈关节(关节7) + # 多圈关节:只需要考虑在当前圈内的偏移 + current_in_first_turn = current_deg % 360.0 + offset = TARGET - current_in_first_turn + + # 同样优化到最短路径 + if offset >= 180.0: + offset -= 360.0 + elif offset < -180.0: + offset += 360.0 + + # 保留RealMan精度 + return round(offset, 3) + + def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]: + return 0,self.motors[motor].id + + def _encode_sign(self, data_name, ids_values): + raise NotImplementedError + + def _decode_sign(self, data_name, ids_values): + raise NotImplementedError + + def _split_into_byte_chunks(self, value, length): + raise NotImplementedError + + def broadcast_ping(self, num_retry = 0, raise_on_error = False): + raise NotImplementedError + def _handshake(self) -> None: + pass diff --git a/src/lerobot/motors/realman/realman_motors_bus_config.py b/src/lerobot/motors/realman/realman_motors_bus_config.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/lerobot/realman/lerobot_robot_config.py b/src/lerobot/realman/lerobot_robot_config.py new file mode 100644 index 000000000..dcc66db35 --- /dev/null +++ b/src/lerobot/realman/lerobot_robot_config.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass, field +from lerobot.cameras.configs import CameraConfig +from lerobot.motors.motors_bus import MotorCalibration +from lerobot.robots.config import RobotConfig + +@RobotConfig.register_subclass("realman") +@dataclass +class RealmanRobotConfig(RobotConfig): + # Port to connect to the arm + port: str + disable_torque_on_disconnect: bool = True + # cameras + cameras: dict[str, CameraConfig] = field(default_factory=dict) + # Set to `True` for backward compatibility with previous policies/dataset + use_degrees: bool = False + joint: dict[str,list] + calibration: dict[str, MotorCalibration] | None = None \ No newline at end of file diff --git a/src/lerobot/realman/lerobot_robot_robot.py b/src/lerobot/realman/lerobot_robot_robot.py new file mode 100644 index 000000000..296047050 --- /dev/null +++ b/src/lerobot/realman/lerobot_robot_robot.py @@ -0,0 +1,197 @@ +import logging +from functools import cached_property +import time +from typing import Any +from lerobot.cameras.utils import make_cameras_from_configs +from lerobot.motors.motors_bus import Motor, MotorCalibration, MotorNormMode +from lerobot.motors.realman.realman import RealmanMotorsBus +from lerobot.robots.realman.realman_robot_config import RealmanRobotConfig +from lerobot.robots.robot import Robot +from lerobot.robots.utils import ensure_safe_goal_position +from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +logger = logging.getLogger(__name__) +class RealmanRobot(Robot): + config_class = RealmanRobotConfig + name = "realman" + def __init__(self, config: RealmanRobotConfig): + super().__init__(config) + self.config = config + print(config) + self.bus = RealmanMotorsBus( + port=self.config.port, + motors=config.motors, + joint=config.joint, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + + def connect(self, calibrate: bool = True) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect() + if not self.is_calibrated and calibrate: + logger.info( + "Mismatch between calibration values in the motor and the calibration file or no calibration file found" + ) + self.calibrate() + + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self) -> None: + if self.calibration: + # Calibration file exists, ask user whether to use it or run new calibration + user_input = input( + f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: " + ) + if user_input.strip().lower() != "c": + logger.info(f"Writing calibration file associated with the id {self.id} to the motors") + self.bus.write_calibration(self.calibration) + return + + logger.info(f"\nRunning calibration of {self}") + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.bus.set_half_turn_homings() + + full_turn_motor = "wrist_roll" + unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor] + print( + f"Move all joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) + range_mins[full_turn_motor] = 0 + range_maxes[full_turn_motor] = 4095 + + self.calibration = {} + for motor, m in self.bus.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.bus.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.bus.torque_disabled(): + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.bus.write("P_Coefficient", motor, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 32) + + if motor == "gripper": + self.bus.write("Max_Torque_Limit", motor, 500) # 50% of max torque to avoid burnout + self.bus.write("Protection_Current", motor, 250) # 50% of max current to avoid burnout + self.bus.write("Overload_Torque", motor, 25) # 25% torque when overloaded + + def setup_motors(self) -> None: + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Read arm position + start = time.perf_counter() + obs_dict = self.bus.sync_read("Present_Position") + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + """Command arm to move to a target joint configuration. + + The relative action magnitude may be clipped depending on the configuration parameter + `max_relative_target`. In this case, the action sent differs from original action. + Thus, this function always returns the action actually sent. + + Raises: + RobotDeviceNotConnectedError: if robot is not connected. + + Returns: + the action sent to the motors, potentially clipped. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.bus.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + + # Send goal position to the arm + self.bus.sync_write("Goal_Position", goal_pos) + return {f"{motor}.pos": val for motor, val in goal_pos.items()} + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") \ No newline at end of file diff --git a/src/lerobot/robots/realman/__init__.py b/src/lerobot/robots/realman/__init__.py new file mode 100644 index 000000000..c6c7ac011 --- /dev/null +++ b/src/lerobot/robots/realman/__init__.py @@ -0,0 +1,3 @@ +from .realman_robot import ( + RealmanRobot +) diff --git a/src/lerobot/robots/realman/realman_robot.py b/src/lerobot/robots/realman/realman_robot.py new file mode 100644 index 000000000..296047050 --- /dev/null +++ b/src/lerobot/robots/realman/realman_robot.py @@ -0,0 +1,197 @@ +import logging +from functools import cached_property +import time +from typing import Any +from lerobot.cameras.utils import make_cameras_from_configs +from lerobot.motors.motors_bus import Motor, MotorCalibration, MotorNormMode +from lerobot.motors.realman.realman import RealmanMotorsBus +from lerobot.robots.realman.realman_robot_config import RealmanRobotConfig +from lerobot.robots.robot import Robot +from lerobot.robots.utils import ensure_safe_goal_position +from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +logger = logging.getLogger(__name__) +class RealmanRobot(Robot): + config_class = RealmanRobotConfig + name = "realman" + def __init__(self, config: RealmanRobotConfig): + super().__init__(config) + self.config = config + print(config) + self.bus = RealmanMotorsBus( + port=self.config.port, + motors=config.motors, + joint=config.joint, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + + def connect(self, calibrate: bool = True) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect() + if not self.is_calibrated and calibrate: + logger.info( + "Mismatch between calibration values in the motor and the calibration file or no calibration file found" + ) + self.calibrate() + + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self) -> None: + if self.calibration: + # Calibration file exists, ask user whether to use it or run new calibration + user_input = input( + f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: " + ) + if user_input.strip().lower() != "c": + logger.info(f"Writing calibration file associated with the id {self.id} to the motors") + self.bus.write_calibration(self.calibration) + return + + logger.info(f"\nRunning calibration of {self}") + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.bus.set_half_turn_homings() + + full_turn_motor = "wrist_roll" + unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor] + print( + f"Move all joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) + range_mins[full_turn_motor] = 0 + range_maxes[full_turn_motor] = 4095 + + self.calibration = {} + for motor, m in self.bus.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.bus.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.bus.torque_disabled(): + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.bus.write("P_Coefficient", motor, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 32) + + if motor == "gripper": + self.bus.write("Max_Torque_Limit", motor, 500) # 50% of max torque to avoid burnout + self.bus.write("Protection_Current", motor, 250) # 50% of max current to avoid burnout + self.bus.write("Overload_Torque", motor, 25) # 25% torque when overloaded + + def setup_motors(self) -> None: + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Read arm position + start = time.perf_counter() + obs_dict = self.bus.sync_read("Present_Position") + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + """Command arm to move to a target joint configuration. + + The relative action magnitude may be clipped depending on the configuration parameter + `max_relative_target`. In this case, the action sent differs from original action. + Thus, this function always returns the action actually sent. + + Raises: + RobotDeviceNotConnectedError: if robot is not connected. + + Returns: + the action sent to the motors, potentially clipped. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.bus.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + + # Send goal position to the arm + self.bus.sync_write("Goal_Position", goal_pos) + return {f"{motor}.pos": val for motor, val in goal_pos.items()} + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") \ No newline at end of file diff --git a/src/lerobot/robots/realman/realman_robot_config.py b/src/lerobot/robots/realman/realman_robot_config.py new file mode 100644 index 000000000..6e9edd46b --- /dev/null +++ b/src/lerobot/robots/realman/realman_robot_config.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass, field +from lerobot.cameras.configs import CameraConfig +from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig +from lerobot.motors.motors_bus import Motor, MotorCalibration +from lerobot.robots.config import RobotConfig + +@RobotConfig.register_subclass("realman") +@dataclass +class RealmanRobotConfig(RobotConfig): + # Port to connect to the arm + port: str + disable_torque_on_disconnect: bool = True + # cameras + cameras: dict[str, RealSenseCameraConfig] = field(default_factory=dict) + joint: list=field(default_factory=list) + motors: dict[str, Motor] = field(default_factory=dict) + calibration: dict[str, MotorCalibration] | None = None \ No newline at end of file diff --git a/src/lerobot/scripts/lerobot_calibrate.py b/src/lerobot/scripts/lerobot_calibrate.py index 0f247caef..5bee93c86 100644 --- a/src/lerobot/scripts/lerobot_calibrate.py +++ b/src/lerobot/scripts/lerobot_calibrate.py @@ -41,7 +41,7 @@ from lerobot.robots import ( # noqa: F401 lekiwi, make_robot_from_config, so100_follower, - so101_follower, + so101_follower ) from lerobot.teleoperators import ( # noqa: F401 Teleoperator, @@ -71,6 +71,7 @@ class CalibrateConfig: @draccus.wrap() def calibrate(cfg: CalibrateConfig): init_logging() + logging.info(pformat(asdict(cfg))) if isinstance(cfg.device, RobotConfig):