Files
lerobot/robot_client/robots/realman/motors_bus.py
2025-12-10 14:46:21 +08:00

280 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from typing import Any, Dict
from Robotic_Arm.rm_robot_interface import RoboticArm,rm_thread_mode_e
from lerobot.motors import Motor, MotorCalibration, MotorsBus
#动作执行成功
ACTION_SUCCESS = 0
class RealmanMotorsBus(MotorsBus):
model_number_table={}
model_ctrl_table={
"realman":{}
}
ip: str
port: int
"""
对Realman SDK的二次封装
"""
def __init__(
self,
port: str,
motors:dict[str, Motor],
gripper_range:list[int],
joint: list,
mock: bool = False,
calibration: dict[str, MotorCalibration] | None = None,
):
super().__init__(port,motors, calibration)
address = port.split(':')
self.ip = address[0]
self.port = int(address[1])
self.motors = motors
self.gripper_range = gripper_range
self.init_joint_position = joint
self.safe_disable_position = joint
self.mock = mock
self.rmarm = RoboticArm(rm_thread_mode_e.RM_DUAL_MODE_E)
self.handle = None
self.init_pose = None
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def connect(self, enable=True) -> bool:
'''
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
'''
self.handle = self.rmarm.rm_create_robot_arm(self.ip, self.port)
if self.mock:
self.rmarm.rm_set_arm_run_mode(0)#仿真模式
else:
self.rmarm.rm_set_arm_run_mode(1)
enable_flag = False
loop_flag = False
# 设置超时时间(秒)
timeout = 5
# 记录进入循环前的时间
start_time = time.time()
elapsed_time_flag = False
while not loop_flag:
elapsed_time = time.time() - start_time
print("--------------------")
if enable:
# 获取机械臂状态
ret = self.rmarm.rm_get_current_arm_state()
if ret[0] == 0: # 成功获取状态
enable_flag = True
else:
enable_flag = False
# 断开所有连接,销毁线程
RoboticArm.rm_destory()
print("使能状态:", enable_flag)
print("--------------------")
if(enable_flag == enable):
loop_flag = True
enable_flag = True
else:
loop_flag = False
enable_flag = False
# 检查是否超过超时时间
if elapsed_time > timeout:
print("超时....")
elapsed_time_flag = True
enable_flag = True
break
time.sleep(1)
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
self.init_pose = ret[1]['pose']
#读取夹状态
gripper_msg = self.rmarm.rm_get_gripper_state()[1]
gripper_state = gripper_msg['actpos']
self.init_pose.append(gripper_state)
resp = enable_flag
print(f"Returning response: {resp}")
return resp
def motor_names(self):
return
def read_calibration(self):
return
def set_calibration(self):
return
def revert_calibration(self):
return
def apply_calibration(self):
"""
移动到初始位置
"""
self.write_arm(target_joint=self.init_joint_position)
def write(self, data_name: str, motor: str, value, *, normalize: bool = True, num_retry: int = 0):
return False
def write_arm(self, target_joint: list):
ret = self.rmarm.rm_movej_follow(target_joint[:-1])
if ret != 0:
print("movej error:", ret)
return ret
ret = self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
return ret
def read(self) -> Dict:
"""
- 机械臂关节消息,单位1度;[-1, 1]
- 机械臂夹爪消息,[-1, 1]
"""
joint_msg = self.rmarm.rm_get_current_arm_state()[1]
joint_state = joint_msg['joint']
gripper_msg = self.rmarm.rm_get_gripper_state()[1]
gripper_state = gripper_msg['actpos']
return {
"joint_1": joint_state[0]/180,
"joint_2": joint_state[1]/180,
"joint_3": joint_state[2]/180,
"joint_4": joint_state[3]/180,
"joint_5": joint_state[4]/180,
"joint_6": joint_state[5]/180,
"gripper": (gripper_state-500)/500
}
def disconnect(self, disable_torque = True):
self.safe_disconnect()
def safe_disconnect(self):
"""
Move to safe disconnect position
"""
self.write_arm(target_joint=self.safe_disable_position)
# 断开所有连接,销毁线程
RoboticArm.rm_destory()
"""
向机械臂写入动作
"""
def sync_write(self,name,actionData: dict[str, Any]):
if name =="Goal_Position":
self.write_arm(target_joint=actionData)
elif name == "Goal_Velocity":#速度透传模式
values = list(actionData.values())
for k,v in enumerate(self.init_pose):
self.init_pose[k]+=values[k]
r = self.rmarm.rm_movej_p(self.init_pose[:-1], 50, 0, 0, 0)
print(f"r:{r},pose:{self.init_pose}")
# self.init_pose[6]+=actionData['gripper']
# self.rmarm.rm_set_gripper_position(int(self.init_pose[6]*1000), False, 0)
def sync_read(self, data_name, motors = None, *, normalize = True, num_retry = 0):
if data_name == "Present_Position":
return self.read()
def _assert_protocol_is_compatible(self, instruction_name):
return True
"""
配置电机
"""
def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None:
self.rmarm.rm_set_gripper_route(self.gripper_range[0],self.gripper_range[1])
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0):
for motor in self._get_motors_list(motors):
self.rmarm.rm_set_joint_en_state(motor,0)
def _disable_torque(self, motor: int, model: str, num_retry: int = 0):
self.rmarm.rm_set_joint_en_state(motor,0)
def enable_torque(self, motors = None, num_retry = 0):
for motor in self._get_motors_list(motors):
self.rmarm.rm_set_joint_en_state(motor,1)
@property
def is_connected(self) -> bool:
return self.handle is not None
@property
def is_calibrated(self) -> bool:
return True
def write_calibration(self, calibration_dict, cache = True):
pass
def _get_half_turn_homings(self, positions):
offsets = {}
for motor_id, current_pos in positions.items():
# 确保输入是角度值
current_deg = self._ensure_angle_degrees(current_pos, motor_id)
# 核心逻辑让当前位置变成180°
offset = self._calculate_angle_offset(current_deg, motor_id)
offsets[motor_id] = offset
return offsets
def _calculate_angle_offset(self, current_deg, motor_id):
"""计算角度偏移量"""
# 获取关节角度范围从配置或SDK
# min_angle, max_angle = self._get_joint_limits(motor_id)
# 对于单圈关节,我们想要的目标是范围中点
# 但set_half_turn_homings的语义是"变成半圈(180°)"
TARGET = 180.0 # 固定目标180°
# 计算基本偏移
offset = TARGET - current_deg
# 单圈关节处理:偏移应该在[-180, 180)范围内
if motor_id <= 6: # 单圈关节
# 优化偏移到最短路径
while offset >= 180.0:
offset -= 360.0
while offset < -180.0:
offset += 360.0
else: # 多圈关节关节7
# 多圈关节:只需要考虑在当前圈内的偏移
current_in_first_turn = current_deg % 360.0
offset = TARGET - current_in_first_turn
# 同样优化到最短路径
if offset >= 180.0:
offset -= 360.0
elif offset < -180.0:
offset += 360.0
# 保留RealMan精度
return round(offset, 3)
def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
return 0,self.motors[motor].id
def _encode_sign(self, data_name, ids_values):
pass
def _decode_sign(self, data_name, ids_values):
pass
def _split_into_byte_chunks(self, value, length):
pass
def broadcast_ping(self, num_retry = 0, raise_on_error = False):
pass
def _handshake(self) -> None:
pass