forked from tangger/lerobot
128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
import time
|
|
from typing import Dict
|
|
from lerobot.common.robot_devices.motors.configs import RealmanMotorsBusConfig
|
|
from Robotic_Arm.rm_robot_interface import *
|
|
|
|
|
|
class RealmanMotorsBus:
|
|
"""
|
|
对Realman SDK的二次封装
|
|
"""
|
|
def __init__(self,
|
|
config: RealmanMotorsBusConfig):
|
|
self.rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
|
|
self.handle = self.rmarm.rm_create_robot_arm(config.ip, config.port)
|
|
self.motors = config.motors
|
|
self.init_joint_position = config.init_joint['joint'] # [6 joints + 1 gripper]
|
|
self.safe_disable_position = config.init_joint['joint']
|
|
|
|
@property
|
|
def motor_names(self) -> list[str]:
|
|
return list(self.motors.keys())
|
|
|
|
@property
|
|
def motor_models(self) -> list[str]:
|
|
return [model for _, model in self.motors.values()]
|
|
|
|
@property
|
|
def motor_indices(self) -> list[int]:
|
|
return [idx for idx, _ in self.motors.values()]
|
|
|
|
|
|
def connect(self, enable=True) -> bool:
|
|
'''
|
|
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
|
|
'''
|
|
enable_flag = False
|
|
loop_flag = False
|
|
# 设置超时时间(秒)
|
|
timeout = 5
|
|
# 记录进入循环前的时间
|
|
start_time = time.time()
|
|
elapsed_time_flag = False
|
|
|
|
while not loop_flag:
|
|
elapsed_time = time.time() - start_time
|
|
print("--------------------")
|
|
|
|
if enable:
|
|
# 获取机械臂状态
|
|
ret = self.rmarm.rm_get_current_arm_state()
|
|
if ret[0] == 0: # 成功获取状态
|
|
enable_flag = True
|
|
else:
|
|
enable_flag = False
|
|
# 断开所有连接,销毁线程
|
|
RoboticArm.rm_destory()
|
|
print("使能状态:", enable_flag)
|
|
print("--------------------")
|
|
if(enable_flag == enable):
|
|
loop_flag = True
|
|
enable_flag = True
|
|
else:
|
|
loop_flag = False
|
|
enable_flag = False
|
|
# 检查是否超过超时时间
|
|
if elapsed_time > timeout:
|
|
print("超时....")
|
|
elapsed_time_flag = True
|
|
enable_flag = True
|
|
break
|
|
time.sleep(1)
|
|
|
|
resp = enable_flag
|
|
print(f"Returning response: {resp}")
|
|
return resp
|
|
|
|
def motor_names(self):
|
|
return
|
|
|
|
def set_calibration(self):
|
|
return
|
|
|
|
def revert_calibration(self):
|
|
return
|
|
|
|
def apply_calibration(self):
|
|
"""
|
|
移动到初始位置
|
|
"""
|
|
self.write(target_joint=self.init_joint_position)
|
|
|
|
def write(self, target_joint:list):
|
|
# self.rmarm.rm_movej(target_joint[:-1], 50, 0, 0, 1)
|
|
self.rmarm.rm_movej_follow(target_joint[:-1])
|
|
self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
|
|
|
|
def write_endpose(self, target_endpose: list, gripper: int):
|
|
self.rmarm.rm_movej_p(target_endpose, 50, 0, 0, 1)
|
|
self.rmarm.rm_set_gripper_position(gripper, block=False, timeout=2)
|
|
|
|
def read(self) -> Dict:
|
|
"""
|
|
- 机械臂关节消息,单位1度;[-1, 1]
|
|
- 机械臂夹爪消息,[-1, 1]
|
|
"""
|
|
joint_msg = self.rmarm.rm_get_current_arm_state()[1]
|
|
joint_state = joint_msg['joint']
|
|
|
|
gripper_msg = self.rmarm.rm_get_gripper_state()[1]
|
|
gripper_state = gripper_msg['actpos']
|
|
|
|
return {
|
|
"joint_1": joint_state[0]/180,
|
|
"joint_2": joint_state[1]/180,
|
|
"joint_3": joint_state[2]/180,
|
|
"joint_4": joint_state[3]/180,
|
|
"joint_5": joint_state[4]/180,
|
|
"joint_6": joint_state[5]/180,
|
|
"gripper": (gripper_state-500)/500
|
|
}
|
|
|
|
def safe_disconnect(self):
|
|
"""
|
|
Move to safe disconnect position
|
|
"""
|
|
self.write(target_joint=self.safe_disable_position)
|
|
# 断开所有连接,销毁线程
|
|
RoboticArm.rm_destory() |