Compare commits

..

2 Commits

Author SHA1 Message Date
ba2b3c217a 提交代码 2025-12-04 15:11:07 +08:00
cf4bb351ed 初步实现realman单臂接口 2025-12-04 11:37:55 +08:00
13 changed files with 742 additions and 1 deletions

1
calibrate.yml Normal file
View File

@@ -0,0 +1 @@
robot: !include realman.yml

View File

@@ -88,6 +88,7 @@ dependencies = [
"deepdiff>=7.0.1,<9.0.0",
"imageio[ffmpeg]>=2.34.0,<3.0.0",
"termcolor>=2.4.0,<4.0.0",
"Robotic_Arm==1.0.4.1"
]
# Optional dependencies

61
realman.yml Normal file
View File

@@ -0,0 +1,61 @@
type: realman
port: 192.168.3.18:8080
gripper_range:
- 10
- 990
motors:
joint_1:
id: 1
model: realman
norm_mode: DEGREES
joint_2:
id: 2
model: realman
norm_mode: DEGREES
joint_3:
id: 3
model: realman
norm_mode: DEGREES
joint_4:
id: 4
model: realman
norm_mode: DEGREES
joint_5:
id: 5
model: realman
norm_mode: DEGREES
joint_6:
id: 6
model: realman
norm_mode: DEGREES
gripper:
id: 7
model: realman
norm_mode: DEGREES
cameras:
left:
serial_number_or_name: 153122077516
fps: 30
width: 640
height: 480
use_depth: False
front:
serial_number_or_name: 145422072751
fps: 30
width: 640
height: 480
use_depth: False
high:
serial_number_or_name: 145422072193
fps: 30
width: 640
height: 480
use_depth: False
joint:
- -90
- 90
- 90
- 90
- 90
- -90
- 10

View File

@@ -45,6 +45,7 @@ When implementing a new policy class (e.g. `DiffusionPolicy`) follow these steps
"""
import itertools
import lerobot.robots.realman
from lerobot.__version__ import __version__ # noqa: F401

View File

View File

@@ -0,0 +1,295 @@
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address
import time
from typing import Any, Dict
from Robotic_Arm.rm_robot_interface import *
#动作执行成功
ACTION_SUCCESS = 0
class RealmanMotorsBus(MotorsBus):
model_number_table={}
model_ctrl_table={
"realman":{}
}
"""
对Realman SDK的二次封装
"""
def __init__(
self,
port: str,
motors:dict[str, Motor],
gripper_range:list[int],
joint: list,
calibration: dict[str, MotorCalibration] | None = None,
):
super().__init__(port,motors, calibration)
self.rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
address = port.split(':')
self.handle = self.rmarm.rm_create_robot_arm(address[0], int(address[1]))
self.motors = motors
self.gripper_range = gripper_range
self.init_joint_position = joint
self.safe_disable_position = joint
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
time.sleep(3)
ret = self.rmarm.rm_get_current_arm_state()
self.init_pose = ret[1]['pose']
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def connect(self, enable=True) -> bool:
'''
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
'''
enable_flag = False
loop_flag = False
# 设置超时时间(秒)
timeout = 5
# 记录进入循环前的时间
start_time = time.time()
elapsed_time_flag = False
while not loop_flag:
elapsed_time = time.time() - start_time
print("--------------------")
if enable:
# 获取机械臂状态
ret = self.rmarm.rm_get_current_arm_state()
if ret[0] == 0: # 成功获取状态
enable_flag = True
else:
enable_flag = False
# 断开所有连接,销毁线程
RoboticArm.rm_destory()
print("使能状态:", enable_flag)
print("--------------------")
if(enable_flag == enable):
loop_flag = True
enable_flag = True
else:
loop_flag = False
enable_flag = False
# 检查是否超过超时时间
if elapsed_time > timeout:
print("超时....")
elapsed_time_flag = True
enable_flag = True
break
time.sleep(1)
resp = enable_flag
print(f"Returning response: {resp}")
return resp
def motor_names(self):
return
def read_calibration(self):
return
def set_calibration(self):
return
def revert_calibration(self):
return
def apply_calibration(self):
"""
移动到初始位置
"""
self.write_arm(target_joint=self.init_joint_position)
def write(self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0):
return False
def write_arm(self, target_joint: list):
ret = self.rmarm.rm_movej_follow(target_joint[:-1])
if ret != 0:
print("movej error:", ret[1])
return ret
ret = self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
return ret
def write_endpose(self, target_endpose: list, gripper: int):
self.rmarm.rm_movej_p(target_endpose, 50, 0, 0, 1)
self.rmarm.rm_set_gripper_position(gripper, block=False, timeout=2)
def write_joint_slow(self, target_joint: list):
self.rmarm.rm_movej(target_joint, 5, 0, 0, 0)
def write_joint_canfd(self, target_joint: list):
self.rmarm.rm_movej_canfd(target_joint, False)
def write_endpose_canfd(self, target_pose: list):
self.rmarm.rm_movep_canfd(target_pose, False)
def write_gripper(self, gripper: int):
self.rmarm.rm_set_gripper_position(gripper, False, 2)
def read(self) -> Dict:
"""
- 机械臂关节消息,单位1度;[-1, 1]
- 机械臂夹爪消息,[-1, 1]
"""
joint_msg = self.rmarm.rm_get_current_arm_state()[1]
joint_state = joint_msg['joint']
gripper_msg = self.rmarm.rm_get_gripper_state()[1]
gripper_state = gripper_msg['actpos']
return {
"joint_1": joint_state[0]/180,
"joint_2": joint_state[1]/180,
"joint_3": joint_state[2]/180,
"joint_4": joint_state[3]/180,
"joint_5": joint_state[4]/180,
"joint_6": joint_state[5]/180,
"gripper": (gripper_state-500)/500
}
def read_current_arm_joint_state(self):
return self.rmarm.rm_get_current_arm_state()[1]['joint']
def read_current_arm_endpose_state(self):
return self.rmarm.rm_get_current_arm_state()[1]['pose']
def safe_disconnect(self):
"""
Move to safe disconnect position
"""
self.write_arm(target_joint=self.safe_disable_position)
# 断开所有连接,销毁线程
RoboticArm.rm_destory()
"""
向机械臂写入动作
"""
def sync_write(self,name,actionData: dict[str, Any]):
if name =="Goal_Position":
self.write_arm(target_joint=actionData)
def sync_read(self, data_name, motors = None, *, normalize = True, num_retry = 0):
if data_name == "Present_Position":
return self.read()
def _assert_protocol_is_compatible(self, instruction_name):
return True
"""
配置电机
"""
def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None:
self.rmarm.rm_set_gripper_route(self.gripper_range[0],self.gripper_range[1])
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0):
for motor in self._get_motors_list(motors):
self.rmarm.rm_set_joint_en_state(motor,0)
def _disable_torque(self, motor: int, model: str, num_retry: int = 0):
self.rmarm.rm_set_joint_en_state(motor,0)
def enable_torque(self, motors = None, num_retry = 0):
for motor in self._get_motors_list(motors):
self.rmarm.rm_set_joint_en_state(motor,1)
@property
def is_calibrated(self) -> bool:
return True
# motors_calibration = self.read_calibration()
# if set(motors_calibration) != set(self.calibration):
# return False
# same_ranges = all(
# self.calibration[motor].range_min == cal.range_min
# and self.calibration[motor].range_max == cal.range_max
# for motor, cal in motors_calibration.items()
# )
# if self.protocol_version == 1:
# return same_ranges
# same_offsets = all(
# self.calibration[motor].homing_offset == cal.homing_offset
# for motor, cal in motors_calibration.items()
# )
# return same_ranges and same_offsets
def write_calibration(self, calibration_dict, cache = True):
pass
# for motor, calibration in calibration_dict.items():
# self.rmarm.rm_set_joint_min_pos(self, motor, calibration.range_min)
# self.rmarm.rm_set_joint_max_pos(self, motor, calibration.range_max)
# if cache:
# self.calibration = calibration_dict
def _get_half_turn_homings(self, positions):
offsets = {}
for motor_id, current_pos in positions.items():
# 确保输入是角度值
current_deg = self._ensure_angle_degrees(current_pos, motor_id)
# 核心逻辑让当前位置变成180°
offset = self._calculate_angle_offset(current_deg, motor_id)
offsets[motor_id] = offset
return offsets
def _calculate_angle_offset(self, current_deg, motor_id):
"""计算角度偏移量"""
# 获取关节角度范围从配置或SDK
# min_angle, max_angle = self._get_joint_limits(motor_id)
# 对于单圈关节,我们想要的目标是范围中点
# 但set_half_turn_homings的语义是"变成半圈(180°)"
TARGET = 180.0 # 固定目标180°
# 计算基本偏移
offset = TARGET - current_deg
# 单圈关节处理:偏移应该在[-180, 180)范围内
if motor_id <= 6: # 单圈关节
# 优化偏移到最短路径
while offset >= 180.0:
offset -= 360.0
while offset < -180.0:
offset += 360.0
else: # 多圈关节关节7
# 多圈关节:只需要考虑在当前圈内的偏移
current_in_first_turn = current_deg % 360.0
offset = TARGET - current_in_first_turn
# 同样优化到最短路径
if offset >= 180.0:
offset -= 360.0
elif offset < -180.0:
offset += 360.0
# 保留RealMan精度
return round(offset, 3)
def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
return 0,self.motors[motor].id
def _encode_sign(self, data_name, ids_values):
raise NotImplementedError
def _decode_sign(self, data_name, ids_values):
raise NotImplementedError
def _split_into_byte_chunks(self, value, length):
raise NotImplementedError
def broadcast_ping(self, num_retry = 0, raise_on_error = False):
raise NotImplementedError
def _handshake(self) -> None:
pass

View File

@@ -0,0 +1,17 @@
from dataclasses import dataclass, field
from lerobot.cameras.configs import CameraConfig
from lerobot.motors.motors_bus import MotorCalibration
from lerobot.robots.config import RobotConfig
@RobotConfig.register_subclass("realman")
@dataclass
class RealmanRobotConfig(RobotConfig):
# Port to connect to the arm
port: str
disable_torque_on_disconnect: bool = True
# cameras
cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Set to `True` for backward compatibility with previous policies/dataset
use_degrees: bool = False
joint: dict[str,list]
calibration: dict[str, MotorCalibration] | None = None

View File

@@ -0,0 +1,197 @@
import logging
from functools import cached_property
import time
from typing import Any
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors.motors_bus import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.realman.realman import RealmanMotorsBus
from lerobot.robots.realman.realman_robot_config import RealmanRobotConfig
from lerobot.robots.robot import Robot
from lerobot.robots.utils import ensure_safe_goal_position
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
logger = logging.getLogger(__name__)
class RealmanRobot(Robot):
config_class = RealmanRobotConfig
name = "realman"
def __init__(self, config: RealmanRobotConfig):
super().__init__(config)
self.config = config
print(config)
self.bus = RealmanMotorsBus(
port=self.config.port,
motors=config.motors,
joint=config.joint,
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
if self.calibration:
# Calibration file exists, ask user whether to use it or run new calibration
user_input = input(
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration of {self}")
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor]
print(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.bus.write("P_Coefficient", motor, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.bus.write("I_Coefficient", motor, 0)
self.bus.write("D_Coefficient", motor, 32)
if motor == "gripper":
self.bus.write("Max_Torque_Limit", motor, 500) # 50% of max torque to avoid burnout
self.bus.write("Protection_Current", motor, 250) # 50% of max current to avoid burnout
self.bus.write("Overload_Torque", motor, 25) # 25% torque when overloaded
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,3 @@
from .realman_robot import (
RealmanRobot
)

View File

@@ -0,0 +1,146 @@
import logging
from functools import cached_property
import time
from typing import Any
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors.motors_bus import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.realman.realman import RealmanMotorsBus
from lerobot.robots.realman.realman_robot_config import RealmanRobotConfig
from lerobot.robots.robot import Robot
from lerobot.robots.utils import ensure_safe_goal_position
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
logger = logging.getLogger(__name__)
class RealmanRobot(Robot):
config_class = RealmanRobotConfig
name = "realman"
def __init__(self, config: RealmanRobotConfig):
super().__init__(config)
self.config = config
self.bus = RealmanMotorsBus(
port=self.config.port,
motors=config.motors,
joint=config.joint,
gripper_range=config.gripper_range,
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
for cam in self.cameras.values():
cam.connect()
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
"""move piper to the home position"""
if not self.is_connected:
raise ConnectionError()
self.bus.apply_calibration()
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors()
def setup_motors(self) -> None:
pass
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,18 @@
from dataclasses import dataclass, field
from lerobot.cameras.configs import CameraConfig
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig
from lerobot.motors.motors_bus import Motor, MotorCalibration
from lerobot.robots.config import RobotConfig
@RobotConfig.register_subclass("realman")
@dataclass
class RealmanRobotConfig(RobotConfig):
# Port to connect to the arm
port: str
gripper_range: list[int] = [10, 990]
disable_torque_on_disconnect: bool = True
# cameras
cameras: dict[str, RealSenseCameraConfig] = field(default_factory=dict)
joint: list=field(default_factory=list)
motors: dict[str, Motor] = field(default_factory=dict)
calibration: dict[str, MotorCalibration] | None = None

View File

@@ -41,7 +41,7 @@ from lerobot.robots import ( # noqa: F401
lekiwi,
make_robot_from_config,
so100_follower,
so101_follower,
so101_follower
)
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
@@ -71,6 +71,7 @@ class CalibrateConfig:
@draccus.wrap()
def calibrate(cfg: CalibrateConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
if isinstance(cfg.device, RobotConfig):