import time from typing import Any, Dict from Robotic_Arm.rm_robot_interface import * from lerobot.motors import Motor, MotorCalibration, MotorsBus #动作执行成功 ACTION_SUCCESS = 0 class RoboticArmMock: def __init__(self,*args): pass def rm_create_robot_arm(self, ip, port): print(f"connnect {ip}:{port}") return {"id":1} def rm_movej(self,*args): print(f"rm_movej:{args}") def rm_movej_follow(self,*args): print(f"rm_movej_follow:{args}") def rm_get_current_arm_state(self): print("get_current_arm_state") return ACTION_SUCCESS, {"pose":[0.0,0.0,0.0,0.0,0.0,0.0],"joint":[0.0,0.0,0.0,0.0,0.0,0.0]} def rm_movej_p(self,*args): print(f"rm_movej_p:{args}") def rm_set_joint_en_state(self,*args): print(f"rm_set_joint_en_state:{args}") def rm_set_gripper_route(self,*args): print(f"rm_set_gripper_route:{args}") def rm_set_gripper_position(self,*kargs): print(f"rm_set_gripper_position:{kargs}") def rm_get_gripper_state(self): print("get_gripper_state") return ACTION_SUCCESS, {"actpos":0.0} def rm_set_gripper_release(self,*args): print(f"rm_set_gripper_release:{args}") def rm_set_gripper_pick_on(self,*args): print(f"rm_set_gripper_pick_on:{args}") def rm_movev_canfd(self,*args): print(f"rm_movev_canfd:{args}") def rm_destory(self): print("destory") class RealmanMotorsBus(MotorsBus): model_number_table={} model_ctrl_table={ "realman":{} } ip: str port: int """ 对Realman SDK的二次封装 """ def __init__( self, port: str, motors:dict[str, Motor], gripper_range:list[int], joint: list, mock: bool = False, calibration: dict[str, MotorCalibration] | None = None, ): super().__init__(port,motors, calibration) if mock: self.rmarm = RoboticArmMock() else: self.rmarm = RoboticArm(rm_thread_mode_e.RM_DUAL_MODE_E) address = port.split(':') self.ip = address[0] self.port = int(address[1]) self.motors = motors self.gripper_range = gripper_range self.init_joint_position = joint self.safe_disable_position = joint self.handle = None self.init_pose = None @property def motor_names(self) -> list[str]: return list(self.motors.keys()) @property def motor_models(self) -> list[str]: return [model for _, model in self.motors.values()] @property def motor_indices(self) -> list[int]: return [idx for idx, _ in self.motors.values()] def connect(self, enable=True) -> bool: ''' 使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序 ''' self.handle = self.rmarm.rm_create_robot_arm(self.ip, self.port) self.rmarm.rm_set_arm_run_mode(0) enable_flag = False loop_flag = False # 设置超时时间(秒) timeout = 5 # 记录进入循环前的时间 start_time = time.time() elapsed_time_flag = False while not loop_flag: elapsed_time = time.time() - start_time print("--------------------") if enable: # 获取机械臂状态 ret = self.rmarm.rm_get_current_arm_state() if ret[0] == 0: # 成功获取状态 enable_flag = True else: enable_flag = False # 断开所有连接,销毁线程 RoboticArm.rm_destory() print("使能状态:", enable_flag) print("--------------------") if(enable_flag == enable): loop_flag = True enable_flag = True else: loop_flag = False enable_flag = False # 检查是否超过超时时间 if elapsed_time > timeout: print("超时....") elapsed_time_flag = True enable_flag = True break time.sleep(1) self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1) time.sleep(3) ret = self.rmarm.rm_get_current_arm_state() self.init_pose = ret[1]['pose'] resp = enable_flag print(f"Returning response: {resp}") return resp def motor_names(self): return def read_calibration(self): return def set_calibration(self): return def revert_calibration(self): return def apply_calibration(self): """ 移动到初始位置 """ self.write_arm(target_joint=self.init_joint_position) def write(self, data_name: str, motor: str, value, *, normalize: bool = True, num_retry: int = 0): return False def write_arm(self, target_joint: list): ret = self.rmarm.rm_movej_follow(target_joint[:-1]) if ret != None: print("movej error:", ret[1]) return ret ret = self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2) return ret def read(self) -> Dict: """ - 机械臂关节消息,单位1度;[-1, 1] - 机械臂夹爪消息,[-1, 1] """ joint_msg = self.rmarm.rm_get_current_arm_state()[1] joint_state = joint_msg['joint'] gripper_msg = self.rmarm.rm_get_gripper_state()[1] gripper_state = gripper_msg['actpos'] return { "joint_1": joint_state[0]/180, "joint_2": joint_state[1]/180, "joint_3": joint_state[2]/180, "joint_4": joint_state[3]/180, "joint_5": joint_state[4]/180, "joint_6": joint_state[5]/180, "gripper": (gripper_state-500)/500 } def read_current_arm_joint_state(self): return self.rmarm.rm_get_current_arm_state()[1]['joint'] def read_current_arm_endpose_state(self): return self.rmarm.rm_get_current_arm_state()[1]['pose'] def disconnect(self, disable_torque = True): self.safe_disconnect() def safe_disconnect(self): """ Move to safe disconnect position """ self.write_arm(target_joint=self.safe_disable_position) # 断开所有连接,销毁线程 RoboticArm.rm_destory() """ 向机械臂写入动作 """ def sync_write(self,name,actionData: dict[str, Any]): if name =="Goal_Position": self.write_arm(target_joint=actionData) elif name == "Goal_Velocity":#速度透传模式 self.rmarm.rm_movev_canfd(list(actionData.values())[:6]) gripper = actionData['gripper'] if gripper > 0 : self.rmarm.rm_set_gripper_release(gripper*1000, False) else: self.rmarm.rm_set_gripper_pick_on(gripper*1000, gripper*1000, False) def sync_read(self, data_name, motors = None, *, normalize = True, num_retry = 0): if data_name == "Present_Position": return self.read() def _assert_protocol_is_compatible(self, instruction_name): return True """ 配置电机 """ def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None: self.rmarm.rm_set_gripper_route(self.gripper_range[0],self.gripper_range[1]) def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0): for motor in self._get_motors_list(motors): self.rmarm.rm_set_joint_en_state(motor,0) def _disable_torque(self, motor: int, model: str, num_retry: int = 0): self.rmarm.rm_set_joint_en_state(motor,0) def enable_torque(self, motors = None, num_retry = 0): for motor in self._get_motors_list(motors): self.rmarm.rm_set_joint_en_state(motor,1) @property def is_connected(self) -> bool: return self.handle is not None @property def is_calibrated(self) -> bool: return True def write_calibration(self, calibration_dict, cache = True): pass def _get_half_turn_homings(self, positions): offsets = {} for motor_id, current_pos in positions.items(): # 确保输入是角度值 current_deg = self._ensure_angle_degrees(current_pos, motor_id) # 核心逻辑:让当前位置变成180° offset = self._calculate_angle_offset(current_deg, motor_id) offsets[motor_id] = offset return offsets def _calculate_angle_offset(self, current_deg, motor_id): """计算角度偏移量""" # 获取关节角度范围(从配置或SDK) # min_angle, max_angle = self._get_joint_limits(motor_id) # 对于单圈关节,我们想要的目标是范围中点 # 但set_half_turn_homings的语义是"变成半圈(180°)" TARGET = 180.0 # 固定目标:180° # 计算基本偏移 offset = TARGET - current_deg # 单圈关节处理:偏移应该在[-180, 180)范围内 if motor_id <= 6: # 单圈关节 # 优化偏移到最短路径 while offset >= 180.0: offset -= 360.0 while offset < -180.0: offset += 360.0 else: # 多圈关节(关节7) # 多圈关节:只需要考虑在当前圈内的偏移 current_in_first_turn = current_deg % 360.0 offset = TARGET - current_in_first_turn # 同样优化到最短路径 if offset >= 180.0: offset -= 360.0 elif offset < -180.0: offset += 360.0 # 保留RealMan精度 return round(offset, 3) def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]: return 0,self.motors[motor].id def _encode_sign(self, data_name, ids_values): raise NotImplementedError def _decode_sign(self, data_name, ids_values): raise NotImplementedError def _split_into_byte_chunks(self, value, length): raise NotImplementedError def broadcast_ping(self, num_retry = 0, raise_on_error = False): raise NotImplementedError def _handshake(self) -> None: pass